ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 아파치 카프카 문서보고 공부하기 #2
    SPRING 공부/기타 2023. 1. 4. 01:57

    #1에서 아파치 카프카를 설치해서 쓰고(pub), 읽는(sub) 작업까지 해보았다.

    외부 연동에 대해서 알아보자.

     

    다시시작하는 도중 에러가 났는데

     ERROR Failed to clean up log for __consumer_offsets

    가 나서 찾아보니 카프카가 retension 주기만큼 데이터를 가지고 있다가 삭제하는데, 삭제해야하는 로그파일이 존재하지 않아서 나는 오류라고 한다? (윈도우가 주로 일어난다고한다.)

    해결방법은 server.properties에 있는 log.dirs 경로의 내용을 모두 삭제하거나

    그냥 linux를 쓰라고 하더라...

    6. 카프카 커넥트

    데이터베이스나, 전통적인 메시징 시스템 혹은 많은 애플리케이션에서 사용중인 데이터를 

    카프카 커넥트를 통해 임포트(추출)해와서 카프카의 topic으로 집어넣을 수 가 있다. 

     

    connector는 standalone 모드와 distributed 모드가 있다.

    • standalone : 단일 로컬 전용 프로세스로 사용된다. 설정 및 시작이 간단하지만 한 명의 작업자만 사용할 수 있다. 모니터링 하기 어렵고 장애를 허용한다.
      개발과 테스트에 유용하다.
    • distributed : 여러작업자들이 여러개의 커넥터와 작업들을 진행한다.
      Rest API를 통해 설정을 제출할 수 있고, 확장가능하다.
      모두에 대한 장애가 발생했을때의 처리를 제공한다.
      운영 배포된 커넥터들에게 유용하다.

     

    우선 config폴더에 있는

    connect-standalone.properties

    connect-file-source.propertiesconnect-file-sink.properties

    을 수정해준다.

     

    connect-stanalone.properties

    plugin.path를 주석 해제하고 다음과같이 해당 라이브러리가 있는 폴더를 설정하여야 한다.

    plugin.path=C:\\kafka\\libs\connect-file-3.3.1.jar

    추가적으로 offset.storage.file.filename이 있는데, 오프셋을 저장할 파일 위치를 정할 수 있다.

     

    오프셋 이란?

    컨슈머그룹이 메시지를 가져갈때 어느 위치까지 가져갔었는지 기록하기 위한 파일이다.

    오프셋 파일을 확인해보면 다음과 같이 나온다.

     

    connect-file-source.properties

    name=local-file-source
    connector.class=FileStreamSource
    tasks.max=1
    # 파일경로
    file=C:\\Users\\KIMTAEHYUN\\Desktop\\test.txt
    topic=토픽명

    connect-file-sink.properties

    name=local-file-sink
    connector.class=FileStreamSink
    tasks.max=1
    # 파일경로
    file=C:\\Users\\KIMTAEHYUN\\Desktop\\test.sink.txt
    topics=토픽명

    이 설정이 완료되면 드디어 실행할 준비가 된것이다.

     

    마지막으로 선행해야 할 것이 있는데,

    위에 connect-file-source에 설정한 test.txt을 만들고 안에 내용을 삽입해야하는 것이다.

    powershell 명령어로 간단하게 만들 수 있다.

    echo "foo" > 파일경로\test.txt

    주의할 점이 있다.

    powershell 버전이 6.0 부터 UTF-8 인코딩을 기본으로 사용하는데, 만약 버전이 이전버전이면

    해당 인코딩을 사용안하게되면서, 메시지가 깨지게 되어있다. 따라서 파워쉘 사용시에는

    $PSVersionTable

    해당 명령어를 쳐서 파워쉘 버전을 확인해보자

    필자의 경우 5.1버전이 나와서 인코딩방법을 찾다가 버전업을 하는 방향으로 진행하였다.

     

    Windows에 PowerShell 설치 - PowerShell | Microsoft Learn

     

    Windows에 PowerShell 설치 - PowerShell

    Windows에서 PowerShell을 설치하는 방법에 대한 정보

    learn.microsoft.com

    7.3버전의 경우 위에서 다운로드 받을 수 있다.

     

    버전업데이트를 하고나서 자동 완성 기능을 제공해서... 훨씬 수월하게 테스트 할 수 있게되었다.

     


     

    이어서 echo명령어로 test.txt를 생성하였으면 다시 cat .\test.txt 명령어로 내용이 들어있는지 확인해보자.

     

    이제 준비작업은 마쳤다.

     

    connect로부터 데이터가 producing되는지 확인하려면 consumer-console를 이용해서 확인한다.

     

    .\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic 토픽명 --from-beginning

    을 이용해서 콘솔 consumer를 가동시킨다.

     

    이제 connect를 띄워보자 명령어는 다음과 같다.

    .\connect-standalone.bat ..\..\config\connect-standalone.properties ..\..\config\connect-file-source.properties ..\..\config\connect-file-sink.properties

    모든 내용을 정확히 이해하진 못했지만 위에서 설정한 3개의 설정을 가지고 standalone이라는 connect를 실행한다는 의미이다.

     

    실행후에 consumer를 확인해보면

    다음과 같은 데이터가 확인되는것을 볼 수 있다.

    만약 파일에 echo 명령어로 내용을 추가하면 다음과 같이 메시지가 계속 추가되는 것을 확인할 수 있다.

    echo "테스트명령어" >> test.txt

    위에 나온 것은 한글로 한번 쳐봤는데, 아마 출력 인코딩이 한글을 지원안하는 것 같다.. (추가적으로 찾아보겠음)

     

     

    7. Kafka Streams를 이용하여 이벤트 처리하기

    카프카 이벤트 데이터를 저장할 때, 자바/스칼라를 위한 Kafka Streams Client Library 를 사용 할 수 있으며,

    순수 자바 어플리케이션에 사용할 수 있다. 

    해당 라이브러리는, 정확하게 한번만 처리할 수 있게하며 상태 저장 작업 및 집계, 윈도우 설정등을 지원한다.

     

    Consumer 와의 비교

    https://www.baeldung.com/java-kafka-streams-vs-kafka-consumer

     

    Kafka Consumer

    • 컨슈머와 프로듀서 사이의 책임이 분리되어있다.
    • 단일 프로세싱이다.
    • 배치 처리를 지원한다.
    • 무상태 처리만 지원한다. 즉, 클라이언트가 이전의 상태 혹은 값을 저장하지 않고 레코드를 개별적으로 처리한다.
    • 어플리케이션에 사용할때 많은 코드가 필요한다. 
    • 쓰레드 화 혹은 병렬처리를 사용하지 않는다.
    • 여러 카프카 클러스터로 작성될 수 있다.

     

     

    Kafka Streams

    • 하나의 카프카 스트림이 생산하고 소비한다.
    • 복잡한 처리를 수행한다.
    • 배치 처리는 지원하지 않는다.
    • 상태, 무상태 처리를 둘다 지원한다.
    • 적은 코드로 애플리케이션 사용이 가능하다.
    • 쓰레드화 혹은 병렬처리화 한다.
    • 오직 하나의 카프카 클러스터만 상호작용한다.
    • 메시지를 저장, 전송하기 위해 논리 단위로 파티션 및 작업 스트리밍을 한다.

     

    가이드로 제공하는 샘플 사용해보기

     

    해당 샘플은 중복문자가 발생하였을때 카운트해주는 로직이 작성된 샘플이다.

    예를들어

    test

    test

    move

    세개의 단어가 메시지가 전송되면

    test 1

    test 2

    move 1

    이런식으로 카운팅을 해주게 된다.

     

    또한 메시지 전달과정은 다음과 같다.

     

     

    package kafkastreams;
    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.common.serialization.Serdes;
    import org.apache.kafka.streams.KafkaStreams;
    import org.apache.kafka.streams.StreamsBuilder;
    import org.apache.kafka.streams.StreamsConfig;
    import org.apache.kafka.streams.kstream.KStream;
    import org.apache.kafka.streams.kstream.KTable;
    import org.apache.kafka.streams.kstream.Produced;
    
    import java.io.FileInputStream;
    import java.io.IOException;
    import java.util.Arrays;
    import java.util.Locale;
    import java.util.Properties;
    import java.util.concurrent.CountDownLatch;
    
    public final class MessageCount {
    
        public static final String INPUT_TOPIC = "streams-plaintext-input";
        public static final String OUTPUT_TOPIC = "streams-wordcount-output";
    
        static Properties getStreamsConfig(final String[] args) throws IOException {
            final Properties props = new Properties();
            if (args != null && args.length > 0) {
                try (final FileInputStream fis = new FileInputStream(args[0])) {
                    props.load(fis);
                }
                if (args.length > 1) {
                    System.out.println("Warning: Some command line arguments were ignored. This demo only accepts an optional configuration file.");
                }
            }
            props.putIfAbsent(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
            props.putIfAbsent(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            props.putIfAbsent(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
            props.putIfAbsent(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
            props.putIfAbsent(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    
            // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
            // Note: To re-run the demo, you need to use the offset reset tool:
            // https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Application+Reset+Tool
            props.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
            return props;
        }
    
        static void createWordCountStream(final StreamsBuilder builder) {
            final KStream<String, String> source = builder.stream(INPUT_TOPIC);
    
            final KTable<String, Long> counts = source
                    .flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
                    .groupBy((key, value) -> value)
                    .count();
    
            // need to override value serde to Long type
            counts.toStream().to(OUTPUT_TOPIC, Produced.with(Serdes.String(), Serdes.Long()));
        }
    
        public static void main(final String[] args) throws IOException {
            final Properties props = getStreamsConfig(args);
    
            final StreamsBuilder builder = new StreamsBuilder();
            createWordCountStream(builder);
            final KafkaStreams streams = new KafkaStreams(builder.build(), props);
            final CountDownLatch latch = new CountDownLatch(1);
    
            // attach shutdown handler to catch control-c
            Runtime.getRuntime().addShutdownHook(new Thread("streams-wordcount-shutdown-hook") {
                @Override
                public void run() {
                    streams.close();
                    latch.countDown();
                }
            });
    
            try {
                streams.start();
                latch.await();
            } catch (final Throwable e) {
                System.exit(1);
            }
            System.exit(0);
        }
    }

     

    - 의존성

    testImplementation 'org.junit.jupiter:junit-jupiter-api:5.8.1'
    testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.8.1'
    implementation group: 'org.apache.kafka', name: 'kafka_2.13', version: '3.3.1'
    implementation group: 'org.apache.kafka', name: 'kafka-streams', version: '3.3.1'
    implementation group: 'org.apache.kafka', name: 'kafka-clients', version: '3.3.1'

     

    해당 파일을 JAR로 빌드한다.

     

    스프링부트가아닌 Java 실행파일이기 때문에, 라이브러리들이 같이 패키징 되지 않는다.

    따라서 Gradle에 다음과 같이 써넣어야 한다.

     

    jar {
        archiveName("Test.jar")
        manifest {
            attributes 'Main-Class': 'kafkastreams.MessageCount'
        }
        from {
            configurations.runtimeClasspath.collect {
                it.isDirectory() ? it : zipTree(it)
            }
        }
        duplicatesStrategy = DuplicatesStrategy.EXCLUDE
    }

    빌드 배포할 jar명을 정하고, 메인 클래스를 정해주어야한다. (manifest)

    그 후 런타임 실행환경을 수집하여 모아준다.

    이때 주의 해야할 것이 duplicatesStrategy를 설정해주어야하는데,

    중복된 라이브러리에 대한 처리를 어떻게 할 것 인지에 대한 설정이다. (설정 하지 않으면 오류 발생)

     

     

    다음으로 넘어가서

    Topic 생성과 , 메시지를 전송해야한다.

    /kafka-topics.bat --create `
        --bootstrap-server localhost:9092 `
        --replication-factor 1 `
        --partitions 1 `
        --topic streams-plaintext-input 
        
    /kafka-topics.bat --create `
        --bootstrap-server localhost:9092 `
        --replication-factor 1 `
        --partitions 1 `
        --topic streams-wordcount-output `
        --config cleanup.policy=compact

    Java 코드에 존재하는 INPUT TOPIC 의 'streams-plaintext-input' 와

                                        OUTPUT TOPIC 의 'streams-wordcount-output' 을 우선 생성한다.

     

    그 다음 어플리케이션을 실행시킨다.

    java -jar test.jar

     

    이제 스트림즈가 작동되려면 INPUT TOPIC 에 메시지를 전달하여야 한다.

    /kafka-console-producer.bat --bootstrap-server localhost:9092 --topic streams-plaintext-input

     

    마지막으로 위의 어플리케이션을 통해 전달될 OUTPUT TOPIC의 Console Consumer를 실행하여야한다.

    이때 각 옵션들을 기입한다. (기입하지 않을 시 역직렬화가 되지 못해 제대로 읽어오지 못 한다.)

    /kafka-console-consumer.bat --bootstrap-server localhost:9092 `
        --topic streams-wordcount-output `
        --from-beginning `
        --formatter kafka.tools.DefaultMessageFormatter `
        --property print.key=true `
        --property print.value=true `
        --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer `
        --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

     

    이제 위의 console-producer에서 메시지를 작성하여보자

     

    여러가지 키워드들을 던져보았다.

    아직 설정이 미숙하여서 hello world를 쳤음에도 2개의 단어를 들어가는 것을 아래의 console-consumer에서

    확인할 수 있다.

     

     

     

    8. 카프카 환경 종료하기

    1. Producer와 Consumer Client를 종료 시킨다.
    2. Kafka Broker를 종료시킨다.
    3. ZooKeeper 서버를 종료시킨다.

    이벤트 발생하여 생성된 데이터를 삭제하려면

    rm -rf /tmp/kafka-logs /tmp/zookeeper /tmp/kraft-combined-logs

    를 실행하여 각 관련 로그들을 삭제 해준다.

    'SPRING 공부 > 기타' 카테고리의 다른 글

    MyBatis 의도치 않은 캐싱  (0) 2023.09.05
    아파치 카프카 문서보고 공부하기 #1  (0) 2022.12.26
    RFC 문서  (0) 2022.12.03
    템플릿 엔진  (0) 2022.02.01
Designed by Tistory.