티스토리 뷰

JAVA공부

Apache Kafka - 부분 내용 정리 ①

CodingDreamTree 2024. 1. 8. 02:24

Kafka - Consumer - Groups.sh

Kafka 컨슈머 호출 및 옵션으로 지정한 쉘 컨슈머그룹을 생성할 수 있는데,

이를 kafka-consumer-groups.sh 스크립트로 확인가능하다.

 

목록을 확인하고자 하면

윈도우 기준으로 아래 명령어로 확인하면 된다.

 

.\kafka-consumer-groups.bat
--bootstrap-server wsl.kafka:9092
--list

 

컨슈머 그룹 목록

 

확인하게 되면 어떠한 컨슈머 그룹이 존재하는지 확인할 수 있다.

 wsl.kafka는 vim C:\Windows\System32\drivers\etc\hosts 을 통해 등록할 수 있는데  

Host IP 등록

나는 WSL을 사용했기 때문에 WSL에서 

 ip addr show eth0 | grep 'inet ' | awk '{print $2}' | cut -d'/' -f1

명령어를 통해 내부 아이피 주소를 가져와서 진행하였다.

 

 

 

 

해당 컨슈머의 그룹이 어떠한 토픽 데이터를 가져갔는지 확인하려면 --group 옵션과 --describe 옵션을 주면된다.

 

.\kafka-consumer-groups.bat 
--bootstrap-server wsl.kafka:9092 `
--group hello-group `
--describe

 

group 옵션에는 어떠한 컨슈머 그룹에 대해 조회할건지 명시를 하여야한다.

컨슈머 그룹 조회

 

 

컬럼 설명
GROUP 컨슈머 그룹
TOPIC 해당 컨슈머 그룹이 마지막으로 커밋한 토픽명
PARTITION 마지막으로 커밋한 파티션 번호
CURRENT-OFFSET(CO) 컨슈머 그룹이 가져간
토픽의 파티션에 가장 최신 오프셋이 몇번인지 나타낸다.
LOG-END-OFFSET(LEO) 해당 컨슈머 그룹의 컨슈머가 어느 오프셋까지
커밋하였는지 알수 있다. <-
(파티션이 정확한 의미이다.)
항상 CURRENT_OFFSET <= LOG_END_OFFSET
LAG 컨슈머 그룹이 토픽 파티션에 있는 데이터를 가져가는데
얼마나 지연이 발생하는 지 나타내는 지표
LAG = LEO - CO
CONSUMER-ID 컨슈머의 토픽할당을 카프카 내부적으로 구분하기 위해 
사용하는 ID값 (Client Id + UUID로 자동 할당된다.)
HOST 컨슈머가 동작하는 Host명을 출력한다.
(호스트 혹은 IP 확인 가능)
CLIENT-ID 컨슈머에 할당된 ID 
사용자 지정할 수 있으며 지정하지 않으면 자동생성된다.

 

 

CONSUMER-ID , HOST,  CLIENT-ID 는 컨슈머 콘솔이 연결된 상태여야만 확인할 수 있다.

 

컨슈머 그룹 정보를 조회하는 것은 개발, 운영 둘다 중요하다.

컨슈머 그룹 중복여부를 체크하거나 랙이 얼마인지 확인하여 상태를 최적화하여야한다.

 

랙이 증가하는거는 프로듀서가 토픽 전달하는 속도 > 컨슈머 처리량 이기 때문에
파티션을 늘려 병렬처리를 도모할 수 도 있다. (줄이는 것은 불가능)

 

또한 카프카에 연결된 컨슈머의 호스트명 또는 IP를 알아낼 수 있다.

해당 IP가 인가된 사용자인지 확인할 수 있다.

 

 

 

Kafka - verifiable - producer, consumer .sh

 

kafka-verifiable로 시작하는 2개의 스크립트를 사용하면 String 타입 메시지 값을 코드 없이 주고받을 수 있다.

간단한 네트워크 통신 테스트시 유용하다.

 

테스트 프로듀서 명령어

./kafka-verifiable-producer.sh 
--bootstrap-server localhost:9092 
--max-messages 10 
--topic verify-test

 

max-messages는 테스트 메시지 전송 개수를 설정하는 것인데, -1를 입력하면 해당 쉘이 종료될때까지 계속 보낸다.

 

 

유효성 프로듀서

 

테스트 메시지 전송후 가장 마지막에 통계정보가 나온다. 이때 평균 처리량을 확인할 수 있다.

필드 설명
timestamp 전송 시각 (밀리초)
name 전송 과정에 대한 이름 (시작 ~ 실행 ~ 종료)
sent Producer가 보낸 레코드, 메시지 수
acked Kafka브로커가 성공적으로 수신한 메시지, 레코드 수
target_throughput Producer가 달성하고자 하는 목표 처리량 
(-1의 경우 목표 처리량 설정이 없음을 나타낸다.)
avg_throughput Producer가 달성한 평균 처리량
초당 바이트 단위
위는 초당 26.861~~ 바이트

 

 

 

전송된 데이터는 컨슈머로 확인 가능하다.

 

./kafka-verifiable-consumer.sh 
--bootstrap-server localhost:9092 \
> --topic verify-test \
> --group-id test-group

 

 

유효성 컨슈머 결과

 

 

컨슈머가 실행되면 시작알림이 출력되고, 토픽에서 데이터를 가져오기 위해 파티션에 컨슈머를 할당하는 작업을 거친다.

record_consumed 의 count 횟수가 10으로 보아 메시지를 정상적으로 모두 받았음을 알 수 있다.

이 후 커밋이 완료되었다.

 

 

Kafka - delete - records .sh

이미 적재된 토픽의 데이터를 지우는 쉘 스크립트이다.

적재되어있는 토픽 데이터중 가장 오래된 (가장 낮은 오프셋의 데이터) 부터 특정 시점 오프셋까지 삭제 가능하다.

즉 0부터 100까지 데이터가 있다면 0 ~ 30 이런식으로 지우기는 가능하나, 30 ~ 50 식의 데이터 삭제가 불가능하다.

 

해당 쉘을 사용하려면 파일을 먼저 만들어야한다.

 

delete.topic.json

{
    "partitions": [
        {
            "topic": "hello.kafka.3",
            "partition": 2,
            "offset": 2
        }
    ],
    "version": 1
}

 

 

 

삭제 bat 파일

.\kafka-delete-records.bat `
--bootstrap-server wsl.kafka:9092 `
--offset-json-file delete-topic.json

 

 

실행 결과


low_watermark는 해당 파티션에 존재하도록 보장되는 최소 오프셋을 나타내며 이 오프셋보다 작은 오프셋을 가진 

모든 레코드를 제거한다.

 

 

만약 오프셋 범위를 넘어가게 되면 다음과 같이 나온다.

 

실패시

 

 

 

 

 

혹시나 해서 컨슈머 그룹을 살펴보았는데 변화는 없었다.

컨슈머 그룹 레코드 삭제 실행전

 

컨슈머 그룹 레코드 삭제 후

 

찾아보니 삭제에 대한 것은 논리적으로 삭제됨으로 표시되고 오프셋이 변동되지 않는다고한다.

 

아래 명령어로 2번 파티션의 오프셋을 초기화 해보고 조회해보았다.

.\kafka-consumer-groups.bat 
--bootstrap-server wsl.kafka:9092 
--group hello-group 
--reset-offsets 
--to-earliest 
--execute 
--topic hello.kafka.3:2

 

컨슈머 실행결과

 

실행 결과로는 삭제된 2건의 hi, hello를 제외한 2건의 Value들이 조회되었다.

 

컨슈머 그룹 재조회

 

 

이렇게 하였음에도 컨슈머 그룹은 초기화 되지 않았다.

 

 

※ 삭제시 주의해야 할 점은 토픽의 특정 레코드만 삭제되는 것이 아니라 오래된 오프셋 부터 지정한 오프셋 까지 삭제되기

때문에 운영환경에서 특히 주의가 필요하다. (특정 오프셋 삭제 불가)

 

 

 

 

 

공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2024/11   »
1 2
3 4 5 6 7 8 9
10 11 12 13 14 15 16
17 18 19 20 21 22 23
24 25 26 27 28 29 30
글 보관함