ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Apache Kafka - 부분 내용 정리 ②
    JAVA공부/JAVA 2024. 1. 15. 02:31

    카프카 클라이언트 

    카프카 클러스터에 명령을 내리거나 데이터를 송수신 하기 위해 

    카프카 프로듀서, 컨슈머, 어드민 클라이언트를 제공하는 카프카 클라이언트를 사용하여

    애플리케이션을 개발한다.

     

    카프카 클라이언트는 라이브러리이기 때문에 자체 라이프사이클을 가진 프레임워크 혹은

    애플리케이션 위에서 구현 및 실행해야 한다.

     

     

     

    프로듀서 API

    프로듀서 애플리케이션은 카프카에 필요한 데이터 선언 및 브로커 특정 토픽 파티션에 전송한다.

    이때, 리더 파티션을 가지고 있는 카프카 브로커와 통신한다.

     

    프로듀서는 데이터를 직렬화하여 카프카 브로커로 전송한다.

    직렬화를 사용하면 자바 기본형, 참조형 뿐 아니라, 동영상 이미지 같은 바이너리 데이터도 전송가능하다.

     

    카프카 공식 자바 라이브러리를 사용하여 프로듀서를 개발해본다.

     

    의존성 추가 

    dependencies {
        implementation 'org.apache.kafka:kafka-clients:3.6.1'
        implementation 'org.slf4j:slf4j-simple:2.0.11'
        testImplementation platform('org.junit:junit-bom:5.9.1')
        testImplementation 'org.junit.jupiter:junit-jupiter'
    }

     

     

    ※ 주의

    카프카 클라이언트 버전과 클러스터 버전을 항상 맞추도록 하자. 버전업이 되면서 새로운 API가 나오거나

    변경 될 수 있기 때문이다.

     

     

     

        private final static Logger logger = LoggerFactory.getLogger(SimpleProducer.class);
        private final static String TOPIC_NAME = "test";
        private final static String BOOTSTRAP_SERVERS = "wsl.kafka:9092";
    
        public static void main(String[] args) {
    
            Properties configs = new Properties();
            configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
            configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    
            KafkaProducer kafkaProducer = new KafkaProducer<>(configs);
    
            String testMessage = "value 1";
    
            ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, testMessage);
    
            kafkaProducer.send(record);
    
            logger.info(">>>> Send Message = {}", record);
    
            kafkaProducer.flush();
            kafkaProducer.close();
        }

     

     

    상위 항목들은 설정에 관한 목록이고

    ProducerRecord 클래스를 통해 토픽과, 메시지 내용을 담아 KafkaProducer 클래스로 전송한다.

    이때 ProducerRecord 생성자 파라미터에 메시지 키값을 넣지 않았는데, 이러하면 null로 전달된다.

     

    Producer에서 send()는 즉각 전송이 아닌 프로듀서 내부에 가지고 있다가 배치 형태로 묶어서 브로커에 전송하는데

    이를 배치 전송 이라 한다. 

     

    flush()메서드를 호출 하여 프로듀서 내부 버퍼의 레코드 배치를 브로커로 전송한다.

     

    close()메서드를 호출하여 producer 인스턴스 리소스를 종료한다.

     

     

    test 토픽 생성후 해당 애플리케이션을 전송하면

     

    프로듀서 로그

     

    해당 로그들이 나오게 되며 적용한 프로퍼티들과 메시지 전송내용을 확인할 수 있다.

     

    제대로 전송이 되었는지 콘솔 컨슈머로 확인해볼 수 있다.

     

    .\bin\windows\kafka-console-consumer.bat `
    --bootstrap-server wsl.kafka:9092 `
    --topic test `
    --from-beginning

     

     

     

    프로듀서의 개념

    프로듀서는 카프카 브로커로 데이터 전송시 내부적으로 파티셔너, 배치 생성 단계를 거친다.

     

    아파치 카프카 애플리케이션 프로그래밍 With 자바

     

     

    ProducerRecord 객체에는 인스턴스 생성시 위와같은 필드값들을 설정할 수 있는데,

    생성 필수 파라미터인 토픽과 메시지값만 설정하여도 전송이 가능하다.

    타임스탬프의 경우 설정을 안하게 되면 브로커 시간을 기준으로 설정된다.

     

    Producer가 send()를 호출하면 Partitioner가 토픽의 어느 파티션으로 전송할 것인지 정한다.

    (기본 클래스는 DefaultPartitioner)

    파티셔너에 의해 구분된 레코드는 어큐뮬레이터에 데이터를 버퍼로 쌓아놓고

    센더 스레드를 통해 발송한다.(배치 전송)

     

    프로듀서 API는 'UnifomStickyPartitioner' 와 'RoundRobinPartitioner' 2개의 파티셔너를 지원하는데

    2.5.0버전 이후 부터는 UnifomStickyPartitioner가 기본 파티셔너이다.

     

    3.6.1버전에서 Partitioner 구현체를 따라가보니 RoundRobin(파티션 순회방식)

    을 제외한 파티셔너는 모두 deprecated되었다.

    DefaultPartitioner의 글을 읽어보니

    기본 파티셔닝 전략은 레코드에 파티션이 지정되어있으면 해당 파티션을 따르고

    키가 있는 경우 키의 해시를 기반으로 파티션을 선택한다고 한다.

    만약 파티션, 키 둘다 없는 경우는 배치가 가득찰때까지 사용하는 고정 파티션을 선택한다고 한다.

    (균일한 고정 배치 크기)

    만약 사용자 지정 파티셔너를 사용하고 싶으면 Partitioner 인터페이스를 구현하여 변경할 수 있다.

     

    카프카 프로듀서는 압축 옵션도 제공하는데, 압축하는데 CPU, 메모리 리소스를 사용하기 때문에 주의해야하며

    컨슈머에서도 압축된 내용을 해제하는데에 리소스가 사용되는점을 주의하자.

     

     

    카프카 주요 옵션

     

    카프카 옵션은 필수 옵션과 선택옵션이 있는데, 선택옵션이라고 해서 중요하지 않은것이 아니라

    Default값을 보고 수정할 수 있어야한다.

     

     

    필수 옵션

    • bootstrap.servers: 프로듀서 데이터를 전송할 대상 호스트이름:포트 1개이상 작성
    • key.serializer: 메시지 키를 직렬화하는 클래스를 지정
    • value.serializer: 메시지 값을 직렬화하는 클래스 지정

     

    선택 옵션

    • acks: 프로듀서가 전송한 데이터가 브로커들에게 정상적으로 저장되었는지 전송 성공 여부 확인하는데
      필요한 옵션
      1은 리더 파티션에 데이터가 저장되면 전송 성공으로 판단.
      0은 브로커에 데이터 저장 여부 상관없이 성공으로 판단
      -1, all은 토픽의 min.insync.replicas 개수에 해당하는 리더 파티션과 팔로워 파티션에 데이터 저장시 성공
    • buffer.memory: 브로커로 전송할 데이터를 배치로 모으기 위해 설정판 버퍼 메모리양 (32MB가 기본값)
    • retries:브로커로부터 에러를 받고 난후 재전송을 시도하는 횟수 (기본값 2147483647)
    • batch.size: 배치로 전송할 레코드 최대 용량 지정 (16384)
    • linger.ms: 배치를 전송하기 전까지 기다리는 최소시간 (기본값 0)
    • partitioner.class: 레코드를 파티션에 전송할 때 적용할 파티셔너 클래스 지정
    • enable.idempotence: 멱등성 프로듀서 동작 여부 설정
    • transactional.id: 레코드 전송시 레코드를 트랜잭션 단위로 묶을지 여부 설정

     

     

    부가적인 내용

     

    메시지 키를 가진 데이터를 전송하는 프로듀서

     

    메시지 키가 포함된 레코드를 전송하고 싶으면 ProducerRecord생성시 파라미터를 추가하면된다.

    토픽이름, 메시지 키 , 메시지 값 순이다.

     

    ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "t1", testMessage);

     

     

     

    컨슈머로 키 값 출력

    .\bin\windows\kafka-console-consumer.bat `
    --bootstrap-server wsl.kafka:9092 `
    --topic test `
    --property print.key=true `
    --from-beginning

     

     

     

     

    커스텀 파티셔너를 가지는 프로듀서

     

     

    파티션 임의 구현

        public class CustomPartitioner implements Partitioner {
    
            @Override
            public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
                
                if (keyBytes == null) {
                    throw new InvalidRecordException("Message Key must not be null");
                }
                
                if (((String)key).equalsIgnoreCase("t1")) {
                    return 0;
                }
    
                List<PartitionInfo> partitionInfos = cluster.partitionsForTopic(topic);
                int partitionSize = partitionInfos.size();
                return Utils.toPositive(Utils.murmur2(keyBytes)) % partitionSize;
            }
    
            @Override
            public void close() {
    
            }
    
            @Override
            public void configure(Map<String, ?> configs) {
    
            }
        }

     

     

    configs.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class);

     

    프로퍼티 설정에 해당 파티셔너를 등록해주면 등록이 된다.

     

     

    .\bin\windows\kafka-consumer-groups.bat --bootstrap-server wsl.kafka:9092 `
    --group test-group `
    --describe

    를 통해 키값이 t1으로 주어졌을때 파티션이 0으로 몰리는 현상이 있었는지 검토해보았다.

     

    0으로 몰리는 것을 확인할 수 있었다.

     

     

     

     

    브로커 정상 전송여부를 확인하는 프로듀서

     

    producer.send(record).get()을 통해 동기적으로 데이터 전송 결과를 가져올 수 있다.

     

    RecordMetadata metadata = (RecordMetadata)kafkaProducer.send(record).get()

     

    test => 토픽을 의미한다.

    0 => 파티션을 의미한다.

    9 => 오프셋 번호를 의미한다.

    test 토픽의 0번 파티션의 9번 오프셋으로 부여되었다는것을 알 수 있다.

     

    그러나 동기적으로 가져오게 되면 전송과정에서 지연이 발생(응답값대기)하기 때문에 

    이를 원하지 않는 경우 비동기적으로 결과를 확인하면 된다. 

    이때 결과를 확인하기 위해 CallBack 인터페이스를 구현하여 해당 구현체를 사용해야한다.

     

     

    public class ProducerCallBack implements Callback {
    
        private final static Logger logger = LoggerFactory.getLogger(ProducerCallBack.class);
    
        @Override
        public void onCompletion(RecordMetadata metadata, Exception exception) {
            if (Objects.nonNull(exception)) {
                logger.error("Kafka Send Error = {}", exception.getMessage(), exception);
            } else {
                logger.info("Kafka MetaData = {}", metadata);
            }
    
        }
    }

     

    전송 요청시 콜백 클래스를 넣으면 비동기로 처리할 수 있다.

            kafkaProducer.send(record, new ProducerCallBack());

     

     

    ※ 주의할점 

    비동기로 결과를 받는 경우 동기로 결과를 받는 것보다 빠르게 처리할 수 있다.

    그러나, 전송하는 데이터 순서가 중요한 경우 사용하면 위험하다.

    비동기로 결과를 기다리는 동안 다음으로 보낼 데이터 전송이 성공하고 앞서 보낸 데이터가 실패할 경우

    재전송으로 인해 데이터 순서가 역전될 수 있기 때문이다.

     

Designed by Tistory.