-
[Kafka] Producer와 ConsumerData Engineer 2023. 5. 16. 19:11728x90
Producer
- 카프카의 토픽으로 메시지를 전송하는 역할
- 원하는 형태에 따라 옵션을 변경하면서 다양한 방법으로 카프카에 메시지를 전송
Producer 디자인
*ProducerRecord : 카프카로 전송하기 위한 실제 데이터
- Topic(필수, 메시지 전송 주소) / Partition(특정 파티션을 지정) / Key(특정 파티션에 레코드들을 정렬) / Value(필수, 메시지 전송)
- Kafka의 특정 토픽으로 메시지를 전송
각 레코드들은 프로듀서의 send() 메서드를 통해 시리얼라이저, 파티셔너를 거치게 된다. 이때 프로듀서 레코드의 선택사항인 파티션 지정 시, 파티셔너는 아무 동작도 하지 않고 지정된 파티션으로 레코드를 전달한다. 파티션을 지정하지 않은 경우 키를 가지고 파티션을 선택해 레코드를 전달하는데, 기본적으로 Round Robin 방식을 따른다. 이렇게 프로듀서 내부에는 send() 메서드 동작 이후 레코드들을 파티션별로 잠시 모아두게 되는데, 그 이유는 프로듀서가 카프카로 전송하기 전 배치 전송을 하기 위함이다. 전송 실패 시 재시도 동작이 이뤄지고, 지정된 횟수만큼의 재시도가 실패하면 최종 실패를 전달하며, 전송 성공 시 메타데이터를 리턴한다.
+@) 이 질문의 이유는 보아즈 발제에서 다른 분이 파티셔너의 메시지 전달 방식인 Round Robin 옵션을 바꿀수 있냐고 말씀을 하셨는데, partitioner.class 환경 변수를 재설정하면 가능하다고 한다.
프로듀서의 주요 옵션
Producer Option Explanation bootstrap.servers Kafka 클러스터는 클러스터 마스터의 개념이 없어 클러스터 내 모든 서버가 클라이언트의 요청을 받을 수 있다. 클라이언트가 카프카 클러스터에 처음 연결하기 위한 호스트와 포트 정보를 나타낸다. client.dns.lookup 하나의 호스트에 여러 IP를 매핑해 사용하는 일부 환경에서 클라이언트가 하나의 IP와 연결하지 못할 경우
- use_dns_ips(Default) : DNS에 할당된 호스트의 모든 IP를 쿼리하고 저장
- 첫번째 IP로 접근 실패 시 종료하지 않고 다음 IP로 접근 시도
- resolve_canonical_bootstrap_servers_only : Kerberos 환경에서 FQDN을 얻기 위한 용도로 사용acks - 프로듀서가 카프카 토픽의 리더 측에 메시지를 전송한 후 요청이 완료하기를 결정하는 옵션
- 0, 1, all(-1) 표현
- 0 : 일부 메시지 손실 대신 빠른 전송
- 1 : 리더가 메시지를 받았는지 확인하나 모든 팔로워를 전부 확인하지 않음
- all : 팔로워가 메시지를 받았는지 여부 확인. 다소 느리나 하나의 팔로워가 있다면 메시지는 손실되지 않음buffer.memory 프로듀서가 카프카 서버로 데이터를 보내기 위해 잠시 대기할 수 있는 전체 메모리 바이트 compression.type 프로듀서가 메시지 전송 시 선택할 수 있는 압축 타입
- none, gzip, snappy, lz4, zstd 등 원하는 타입 선택enable.idempotence - 설정을 true로 할 경우 중복 없는 전송이 가능
- max.in.flight.requests.per.connection = 5 이하, retries = 0 이상, acks = all로 지정해야 함max.in.flight.requests.per.connection - 하나의 커넥션에서 프로듀서가 최대한 ACK 없이 전송할 수 있는 요청 수
- 메시지의 순서가 중요하다면 1로 설정할 것을 권장하지만 성능은 다소 떨어짐retries 일시적인 오류로 인해 전송에 실패한 데이터를 다시 보내는 횟수 batch.size 프로듀서는 동일한 파티션으로 보내는 여러 데이터를 함께 배치로 보내려고 시도함.
- 적절한 배치 크기 설정은 성능에 도움linger.ms 배치 형태의 메시지를 보내기 전에 추가적인 메시지를 위해 기다리는 시간을 조정
- 배치 크기에 도달하지 못한 상황에서 linger.ms 제한 시간에 도달했을 때 메시지를 전송transactional.id - '정확히 한 번 전송'을 위해 사용하는 옵션이며, 동일한 TransactionalId에 한해 정확히 한 번을 보장. 옵션을 사용하기 전 enable.idempotenc를 true로 설정 Producer Example
- 프로듀서의 전송 방법은 메시지를 보내고 확인하지 않기 / 동기 전송 / 비동기 전송의 3가지 방식
<메시지를 보내고 확인하지 않기>
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class ProduceFireForgot { public static void main(String[] args) { Properties props = new Properties(); // Properties 객체 생성 props.put("bootstrap.servers", "peter-kafka01.foo.bar:9092, peter-kafka02.foo.bar:9092, peter-kafka03.foo.bar:9092"); // Broker 리스트 정의 // 메시지 키와 밸류는 문자열 타입이므로 카프카의 기본 StringSerializer 지정 props.put("Key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // Properties 객체를 전달해 새 프로듀서 생성 Producer <String, String> producer = new KafkaProducer<>(props); try{ for (int i = 0, i < 3, i++){ ProducerRecord<Stirng, String> record = new ProducerRecord<>("peter-basic01", "Apache Kafka is a distiributed streaming platform - " + i); // ProducerRecord 생성 // send() 메서드 이용 메시지 전송 후 자바 Future 객체로 전송하나 리턴값 무시로 전송 성공 여부 x producer.send(record); } } catch (Exception e){ // 카프카 브로커의 메시지 전송 후 에러 무시 / 전송 전 에러 발생 시 예외처리 e.printStackTrace(); } finally { producer.close(); // 프로듀서 종료 } } }
동기 전송
: 메시지 전송 성공 여부 파악 가능
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Properties; public class ProducerSync { public static void main(String[] args) { Properties props = new Properties(); // Properties 객체 생성 props.put("bootstrap.servers", "peter-kafka01.foo.bar:9092, peter-kafka02.foo.bar:9092, peter-kafka03.foo.bar:9092"); // Broker 리스트 정의 // 메시지 키와 밸류는 문자열 타입이므로 카프카의 기본 StringSerializer 지정 props.put("Key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // Properties 객체를 전달해 새 프로듀서 생성 Producer <String, String> producer = new KafkaProducer<>(props); try{ for (int i = 0, i < 3, i++){ ProducerRecord<Stirng, String> record = new ProducerRecord<>("peter-basic01", "Apache Kafka is a distiributed streaming platform - " + i); // ProducerRecord 생성 // get 메서드를 이용해 카프카 응답 대기. 메시지 전송 실패 시 예외 발생 // 에러가 없다면 RecordMetadata를 얻음 RecordMetadata metadata = producer.send(record).get(); System.out.printf("Topic: %s, Partition: %d, Offset: %d, Key: %s, Received Message: %s\n", metadata.topic(), metadata.partition(), metadata.offset(), record.key(), record.value()); } } catch (Exception e){ // 카프카 브로커의 메시지 전송 전과 보내는 동안 에러 발생 시 예외 발생 e.printStackTrace(); } finally { producer.close(); // 프로듀서 종료 } } }
콜백
import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; // 콜백을 사용하기 위해 org.apache.kafka.clients.producer.Callback 구현 클래스 선언 public class PeterProducerCallback implements Callback { private ProducerRecord<String, String> record; public PeterProducerCallback(ProducerRecord<String, String> record){ this.record = record; } @Override public void onCompletion(RecordMetadata metadata, Exception e){ if (e != null){ e.printStackTrace(); // 카프카 오류 리턴 시 onCompletion() : 예외 // 실제 운영 환경에선 추가적인 예외 처리 필요 } else { System.out.printf("Topic: %s, Partition: %d, Offset: %d, Key: %s, Received Message: %s\n", metadata.topic(), metadata.partition(), metadata.offset(), record.key(), record.value()); } } }
비동기 전송
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class ProducerAsync { public static void main(String[] args) { Properties props = new Properties(); // Properties 객체 생성 props.put("bootstrap.servers", "peter-kafka01.foo.bar:9092, peter-kafka02.foo.bar:9092, peter-kafka03.foo.bar:9092"); // Broker 리스트 정의 // 메시지 키와 밸류는 문자열 타입이므로 카프카의 기본 StringSerializer 지정 props.put("Key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // Properties 객체를 전달해 새 프로듀서 생성 Producer <String, String> producer = new KafkaProducer<>(props); try{ for (int i = 0, i < 3, i++){ ProducerRecord<Stirng, String> record = new ProducerRecord<>("peter-basic01", "Apache Kafka is a distiributed streaming platform - " + i); // ProducerRecord 생성 // 프로듀서에서 레코드를 보낼 때 콜백 객체를 같이 보냄 producer.send(record, new PeterProducerCallback(record)); } } catch (Exception e){ // 카프카 브로커의 메시지 전송 전과 보내는 동안 에러 발생 시 예외 발생 e.printStackTrace(); } finally { producer.close(); // 프로듀서 종료 } } }
- 프로듀서는 send() 메서드와 콜백을 함께 호출
- 동기 전송과 같이 프로듀서가 보낸 모든 메시지에 대해 응답을 기다리면 많은 시간을 소비하게 되므로 빠른 전송 불가
- 비동기 방식으로 전송하면 빠른 전송이 가능하고, 메시지 전송이 실패한 경우라도 예외를 처리 가능
-> 에러 로그 등에 기록 가능
Consumer
- 카프카의 토픽에 저장되어 있는 메시지를 가져오는 역할
- 컨슈머 그룹, 리밸런싱 등 여러 동작을 수행
프로듀서가 아무리 빠르게 카프카로 메시지를 전송하더라도 컨슈머가 카프카로부터 빠르게 메시지를 읽어오지 못한다면 결국 지연이 발생하므로, 컨슈머의 역할은 매우 중요하다.
컨슈머의 기본 동작
- 프로듀서가 카프카의 토픽으로 메시지를 전송하면 해당 메시지들은 브로커들의 로컬 디스크에 저장
- 컨슈머 그룹 : 하나 이상의 컨슈머들이 모여 있는 그룹을 의미하며, 컨슈머는 반드시 컨슈머 그룹에 속한다. 컨슈머 그룹은 각 파티션의 리더에게 카프카 토픽에 저장된 메시지를 가져오기 위한 요청을 보내는데, 이때 파티션 수와 컨슈머 수는 일대일로 매핑되는 것이 이상적이다.
*컨슈머 수가 파티션 수보다 더 많다고 해서 더 빠르게 토픽의 메시지를 가져오거나 처리량이 높아지지 않고, 더 많은 수의 컨슈머들이 그냥 대기 상태로만 존재하기 때문이다. 컨슈머 그룹 내 리밸런싱 동작을 통해 장애가 발생한 컨슈머의 역할을 동일한 그룹에 있는 다른 컨슈머가 그 역할을 대신 수행하므로 장애 대비를 위한 추가 컨슈머 리소스를 할당하지 않아도 된다.
컨슈머의 주요 옵션
Consumer Option Explanation bootstrap.servers 프로듀서와 동일하게 브로커의 정보 입력 fetch.min.bytes - 한 번에 가져올 수 있는 최소 데이터 크기
- 지정한 크기보다 작은 경우, 요청에 응답하지 않고 데이터가 누적될 때까지 대기group.id 컨슈머가 속한 컨슈머 그룹 식별하는 식별자
- 동일한 그룹 내 컨슈머 정보는 모두 공유heartbeat.interval.ms - heartbeat : 컨슈머 상태 = active
- session.timeout.ms와 밀접한 관련
- session.timeout.ms보다 낮은 값으로 설정(일반적으로 1/3)max.partition.fetch.bytes 파티션당 가져올 수 있는 최대 크기 session.timeout.ms - 컨슈머가 종료된 것인지를 판단
- 컨슈머는 주기적으로 하트비트를 보내야 함
- 초과 시 해당 컨슈머를 종료 상태로 판단, 컨슈머 그룹에서 제외하고 리밸런싱 시작enable.auto.commit 백그라운드로 주기적인 오프셋 커밋 auto.offset.reset 카프카에서 초기 오프셋이 없거나 현재 오프셋이 더 이상 존재하지 않는 경우
- earliest : 가장 초기의 오프셋값으로 설정
- latest : 가장 마지막의 오프셋값으로 설정
- none : 이전 오프셋값을 찾지 못하면 에러fetch.max.bytes 한 번의 가져오기 요청으로 가져올 수 있는 최대 크기 group.instance.id 컨슈머의 고유 식별자
- 설정시 static 멤버로 간주, 불필요한 리밸런싱 xisolation.level 트랜잭션 컨슈머에서 사용되는 옵션
- read_uncommitted : 기본값으로 모든 메시지를 읽음
- read_committed : 트랜잭션 완료 메시지만 읽기max.poll.records 한 번의 poll() 요청으로 가져오는 최대 메시지 수 partition.assignment.strategy 파티션 할당 전략으로, 기본값은 range fetch.max.wait.ms fetch.min.bytes에 의해 설정된 데이터보다 적은 경우 요청에 대한 응답을 기다리는 최대 시간 컨슈머 예제
: 오토 커밋 / 동기 가져오기 / 비동기 가져오기
오토 커밋
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Arrays; import java.util.Properties; public class ConsumerAuto { // Properties 객체 생성 public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "peter-kafka01.foo.bar:9092, peter-kafka02.foo.bar:9092, peter-kafka03.foo.bar:9092"); // 브로커 리스트 정의 props.put("group.id", "peter-consumer01"); // 컨슈머 그룹 아이디 정의 props.put("enable.auto.commit", "true"); // 오토 커밋 사용 props.put("auto.offset.reset", "latest"); // 컨슈머 오프셋을 찾지 못할 때 latest로 초기화 // 문자열을 사용했으므로 StringDeserializer 지정 props.put("key.deserializer", "org.apache.kafka.common.serialization. Deserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization. Deserializer"); // Properties 객체를 전달해 새 컨슈머 생성 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("peter-basic01")); // 구독할 토픽 지정 try { while (true){ // 메시지를 가져오기 위해 카프카에 지속적인 poll() 요청 // 폴링 계속 유지 + 타임아웃 주기를 설정해 해당 시간만큼 블록 ConsumerRecords<String, String> records = consumer.poll(1000); // poll() -> 레코드 전체를 리턴 : 하나의 메시지 x , 반복문 처리 for (ConsumerRecord<String, String> record : records){ System.out.printf("Topic: %s, Partition: %s, Offset: %d, Key: %s, Value: %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value()); } } } catch (Excepion e) { e.printStackTrace(); } finally { consumer.close(); // 컨슈머 종료 } } }
- 컨슈머 애플리케이션들의 기본값
- 오프셋을 주기적으로 커밋하므로 관리자가 따로 관리하지 않아도 되는 장점
- 컨슈머 종료 등이 빈번히 일어나면 일부 메시지를 못 가져오거나 중복으로 가져오는 경우 존재
Kafka가 안정적으로 잘 동작하고 컨슈머 역시 한번 구동하고 나면 자주 변경되거나 종료되는 현상이 없으므로 오토 커밋을 사용하는 경우가 많음
동기 가져오기
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Arrays; import java.util.Properties; public class ConsumerSync { // Properties 객체 생성 public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "peter-kafka01.foo.bar:9092, peter-kafka02.foo.bar:9092, peter-kafka03.foo.bar:9092"); // 브로커 리스트 정의 props.put("group.id", "peter-consumer01"); // 컨슈머 그룹 아이디 정의 props.put("enable.auto.commit", "false"); // 오토 커밋 사용 x props.put("auto.offset.reset", "latest"); // 컨슈머 오프셋을 찾지 못할 때 latest로 초기화 // 문자열을 사용했으므로 StringDeserializer 지정 props.put("key.deserializer", "org.apache.kafka.common.serialization. Deserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization. Deserializer"); // Properties 객체를 전달해 새 컨슈머 생성 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("peter-basic01")); // 구독할 토픽 지정 try { while (true){ // 메시지를 가져오기 위해 카프카에 지속적인 poll() 요청 // 폴링 계속 유지 + 타임아웃 주기를 설정해 해당 시간만큼 블록 ConsumerRecords<String, String> records = consumer.poll(1000); // poll() -> 레코드 전체를 리턴 : 하나의 메시지 x , 반복문 처리 for (ConsumerRecord<String, String> record : records){ System.out.printf("Topic: %s, Partition: %s, Offset: %d, Key: %s, Value: %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value()); } // 현재 배치를 통해 읽은 모든 메시지 처리 후 추가 메시지 폴링 전 현재 오프셋 동기 커밋 consumer.commitSync(); } } catch (Excepion e) { e.printStackTrace(); } finally { consumer.close(); // 컨슈머 종료 } } }
- 오토 커밋과 달리 poll()을 이용 메시지를 가져온 후 처리까지 완료하고 현재의 오프셋을 커밋
- 속도는 느리나 메시지 손실은 거의 발생하지 않음
*메시지 손실 : 실제로 토픽에는 메시지가 존재하나 잘못된 오프셋 커밋으로 인한 위치 변경으로 컨슈머가 메시지를 가져오지 못하는 경우
메시지가 손실되면 안 되는 중요한 처리 작업들은 동기 방식으로 진행하는 것을 권장하나, 메시지의 중복 이슈는 피할 수 없다.
비동기 가져오기
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Arrays; import java.util.Properties; public class ConsumerAsync { // Properties 객체 생성 public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "peter-kafka01.foo.bar:9092, peter-kafka02.foo.bar:9092, peter-kafka03.foo.bar:9092"); // 브로커 리스트 정의 props.put("group.id", "peter-consumer01"); // 컨슈머 그룹 아이디 정의 props.put("enable.auto.commit", "false"); // 오토 커밋 사용 x props.put("auto.offset.reset", "latest"); // 컨슈머 오프셋을 찾지 못할 때 latest로 초기화 // 문자열을 사용했으므로 StringDeserializer 지정 props.put("key.deserializer", "org.apache.kafka.common.serialization. Deserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization. Deserializer"); // Properties 객체를 전달해 새 컨슈머 생성 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("peter-basic01")); // 구독할 토픽 지정 try { while (true){ // 메시지를 가져오기 위해 카프카에 지속적인 poll() 요청 // 폴링 계속 유지 + 타임아웃 주기를 설정해 해당 시간만큼 블록 ConsumerRecords<String, String> records = consumer.poll(1000); // poll() -> 레코드 전체를 리턴 : 하나의 메시지 x , 반복문 처리 for (ConsumerRecord<String, String> record : records){ System.out.printf("Topic: %s, Partition: %s, Offset: %d, Key: %s, Value: %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value()); } // 현재 배치를 통해 읽은 모든 메시지 처리 후 추가 메시지 폴링 전 현재 오프셋 비동기 커밋 consumer.commitAsync(); } } catch (Excepion e) { e.printStackTrace(); } finally { consumer.close(); // 컨슈머 종료 } } }
- consumer.Async() : 오프셋 커밋을 실패하더라도 재시도하지 않음
Ex. 총 10개의 메시지, 오프셋 1번부터 10번까지 순차적으로 커밋한다고 가정
[비동기 방식]
1. 1번 오프셋의 메시지를 읽은 뒤 1번 오프셋을 비동기 커밋(현재 last : 1)
2. 2번 오프셋의 메시지를 읽은 뒤 2번 오프셋을 비동기 커밋하나 실패 (현재 last : 1)
3. 3번 오프셋의 메시지를 읽은 뒤 3번 오프셋을 비동기 커밋하나 실패 (현재 last : 1)
4. 2번 오프셋의 메시지를 읽은 뒤 4번 오프셋을 비동기 커밋하나 실패 (현재 last : 1)
5. 5번 오프셋의 메시지를 읽은 뒤 5번 오프셋을 비동기 커밋 (현재 last : 5)
- 현재 5번 오프셋의 메시지를 읽고 비동기 커밋까지 성공했으므로 현재 마지막 오프셋은 5
- 비동기 커밋의 재시도로 인해 2번 오프셋의 비동기 커밋이 성공하게 되면 마지막 오프셋이 2로 변경
- 현재의 컨슈머가 종료되고 다른 컨슈머가 이어서 작업을 하게 될 경우 다시 3번 오프셋부터 메시지를 가져옴 -> 비동기 커밋 재시도로 인해 천 단위, 만 단위의 오프셋이 커밋되어 버리면 그 수만큼 메시지가 계속 중복됨. 따라서 비동기의 경우 커밋 재시도를 하지 않음(콜백을 같이 사용하는 경우도 존재)
728x90'Data Engineer' 카테고리의 다른 글
[Kafka] 프로듀서의 동작과 원리 (0) 2023.05.26 [Kafka] 카프카의 내부 동작원리 (0) 2023.05.20 [Kafka] 정리_01 (0) 2023.05.14 [Data Engineer] Kafka란 무엇인가 (0) 2023.05.07 SaaS, Software as a Service (0) 2023.04.02