티스토리 뷰
카프카 커넥트
카프카 커넥트는 데이터 파이프라인 생성 시 반복 작업을 줄이고 효율적인 전송을 이루기 위한 애플리케이션이다.
카프카 프로듀서-컨슈머 애플리케이션과 유사하지만 해당 애플리케이션은 세밀한 제어, 고유한 요구 사항의 경우
사용하고 커넥트의 경우 표준화된 접근방식에 대해 사용하게 된다. (템플릿)
파이프 라인 생성시 자주 반복되는 값(토픽 이름, 파일 이름, 테이블 이름)등을 파라미터로 받는 커넥터를 코드로 작성
후에 파이프라인을 실행할 때는 코드 작성할 필요가 없기 때문이다.
카프카 커넥터는 프로듀서 역할을 하는 '소스 커넥터' 와 컨슈머 역할을 하는 '싱크 커넥터' 로 나뉜다.
일정한 프로토콜을 가진 소스 애플리케이션이나 싱크 애플리케이션이 있다면 커넥터를 통해 카프카로 데이터를 보내거나
데이터를 가져올 수 있다.
카프카 libs폴더를 확인하면 기본으로 제공하는 커넥터 플러그인들을 확인할 수 있다.
만약 추가해서 사용하고 싶으면 직접 플러그인을 제작하거나 인터넷 상에 존재하는 플러그인을 가져다 사용할 수 있다.
오픈 소스 커넥트는 Home | Confluent Hub 에서 다운 받을 수 있는데, License항목을 유의해서 보도록하자.
free를 체크해야 오픈소스로서 사용할 수 있으며 그 이상은 컨플루언트의 구독이 필요하다.
(커머셜이나 프리미엄은 싱글 브로커 클로스터(개발자) 혹은 30일 정도만 무료로 사용할 수 있다.)
사용자가 커넥트에 커넥터 생성 명령을 내리면 내부에 커넥터와 태스크를 생성한다.
커넥터들은 태스크들을 관리하며, 테스크가 실질적인 데이터 처리를 한다.
사용자가 커넥터를 사용하여 파이프라인 생성시에는 컨버터 와 트랜스폼 기능을 옵션을 추가할 수 있는데,
컨버터는 데이터 처리를 하기 전 스키마 변경을 도와준다. JsonConverter, StringConverter, ByteArrayConverter등이
있으며, 필요한 경우 커스텀 컨버터를 사용할 수 있다.
트랜스폼은 데이터 처리시 메시지 단위로 데이터를 간단하게 변환하기 위한 용도로 사용된다.
Json데이터를 커넥터에서 상요할 때 트랜스폼을 사용하면 특정 키를 삭제하거나 추가할 수 있다.
Cast,Drop, ExtractField 등이 있다.
커넥트를 실행하는 방법
커넥트 실해모드는 2가지가 존재하는데
1. 단일모드 커넥트: 단일 애플리케이션으로 실행되어 고가용성이 구성되지 않아 단일 장애점(SPOF)이 될 수 있다.
주로 개발환경이나 중요도가 낮은 파이프라인 운용시 사용한다.
단일모드 커넥트를 실행하려면 connect-standalone.properties 파일을 수정해야한다.
1. 서버설정,
2. 컨버터 설정 key.converter.schemas.enable 의 경우 스키마 형태를 포함(true) 미포함(false)로 나누는 옵션이다.
참고링크: Azure Cosmos DB용 Kafka 커넥트 사용하여 데이터 읽기 및 쓰기 | Microsoft Learn
3. 단일모드 사용시 로컬파일에 오프셋 정보를 저장하는데, 소스 커넥터, 싱크 커넥터가 데이터 처리시점을 저장하기 위해
사용한다.
4. 태스크 작업 처리후 오프셋 커밋하는 주기 설정
5. 플러그인 형태로 추가할 커넥터의 디렉토리 주소 (jar파일의 디렉토리 위치) 2개이상은 , (콤마)로 구분한다.
(주석을 지우고 지정을 해주어야한다.)
단일 모드 커넥트의 경우 커넥트 설정파일과 함께 커넥터 설정파일도 정의하여 실행하여야한다.
connect-file-source.properties로 저장되어있다.
connect-file-source.properties
1. 커넥터 이름을 지정하는데, 커넥트 에서 유일한 이름이여야한다.
2. FileStreamSource는 카프카에서 기본으로 제공하는 기본 클래스이며 파일 소스 커넥터를 구현한 클래스
파일이다.
3. 커넥터를 실행한 테스크 개수를 지정하는데, 개수를 늘려서 병렬 처리할 수 있다.
4.읽을 파일 위치를 지정한다.
5. 토픽 이름 지정한다.
실행은 다음 명령어로 진행할 수 있다.
.\connect-standalone.bat ..\..\config\connect-standalone.properties ..\..\config\connect-file-source.properties
2. 분산 모드 커넥트: 2대 이상의 서버에서 클러스터 형태로 운영하며 단일 모드 커넥트 대비 안전하게 운영된다.
데이터 처리량이 많아질 경우 무중단으로 스케일 아웃하여 처리량을 늘릴 수 있다.
(상용환경에서는 안정적으로 운영할 수 있게 2대 이상으로 구성 및 설정하는 것이 좋다.)
분산 모드 커넥트는 단일 모드와 다르게 2개 이상의 프로세스가 1개의 그룹으로 묶여 운영된다.
connect-distributed.properties를 보자.
1. 서버설정
2. 그룹 id 설정 동일한 group.id으로 지정된 커넥트들은 같은 그룹으로 인식되어, 그룹 내에서 커넥터가 실행되면
커넥트들에게 분산되어 실행된다.
3. 컨버터 설정
4. 분산모드인 경우 카프카 내부 토픽에 오프셋 정보를 저장하게 된다. 해당 정보는 데이터 처리시 중요하므로
실제 운영할때는 복제 개수를 3보다 큰값으로 설정하는 것이 좋다.
5. 오프셋 커밋 주기
6. 플러그인 추가할 디렉토리
분산 모드 커넥트를 실행할 때는 커넥트 설정파일만 있으면 된다. Rest API를 통해 실행/중단/변경 할 수 있기 때문이다.
.\connect-distributed.bat ..\..\config\connect-distributed.properties
아래 Rest API 호출을 통해 사용가능한 플러그인이 검색가능하고
curl -X GET http://localhost:8083/connector-plugins
FileStreamSourceConnector로 커넥터를 실행해보자.
C:\Users\KIMTAEHYUN> curl -X POST -H "Content-Type: application/json" `
--data '{
"name": "localhost-file-source2",
"config":
{
"connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
"file": "C:\\kafka2.13\\config\\temp\\test.txt",
"tasks.max": "1",
"topic": "connect-distibute"
}
}' `
http://localhost:8083/connectors
정상 작동여부
curl -X GET http://localhost:8083/connectors/localhost-file-source/status
커넥터 종료
curl -X DELETE http://localhost:8083/connectors/localhost-file-source
커넥터 목록 확인
curl -X GET http://localhost:8083/connectors
빈 배열이 호출되면 삭제된것을 확인 할 수 있다.
분산모드 커넥트는 서버로 운영되어있기 때문에 Rest API를 통해 커넥트의 커넥터 플러그인 종류, 태스크 상태,
커넥터 상태 등을 조회할 수 있으며 8083포트로 호출 가능하다. (설정 통해 변경가능)
Connect REST Interface | Confluent Documentation에서 API종류를 확인 할 수 있다.
소스 커넥터
소스 커넥터는 소스 애플리케이션 또는 소스 파일로부터 데이터를 가져와 토픽으로 넣는 역할을 한다.
오픈소스 소스 커넥터를 사용해도 되지만 라이센스, 로직 요구사항이 맞지 않는 경우 직접 개발해야할 때
카프카 커넥트 라이브러리에서 제공하는 SourceConnector와 Source Task클래스를 이용하여 구현가능하다.
build.gradle에 다음 종속성을 추가하여 개발해보도록 하자.
dependencies {
compileOnly 'org.apache.kafka:connect-api:3.6.1'
}
소스 커넥터를 만들 때 필요한 클래스는 2개이다.
1. SourceConnector: 태스크를 실행하기전 커넥터 설정 초기화, 어떤 태스크 클래스를 사용할 지 정의하는데 사용된다.
이 클래스를 상속받는 클래스는 커넥트 호출시 사용되므로 명확하게 어디에 사용되는 커넥터인지 적으면 좋다.
SourceConnect 클래스를 상속받는 경우 여러가지 추상메서드를 구현을 해야하는데
다음과 같은 메서드들이 존재한다.
1) version() : 커넥터 버전을 리턴하는데, 커넥터 플러그인 조회시 이 버전이 노출된다.
2) start(Map<String, String> props) : JSON 또는 config 파일 형태로 입력한 설정값을 초기화 하는 메서드
여기에 검증하는 로직을 넣어서 ConnectionException()을 호출하여 커넥터를 종료시킨다.
3) taskClass() : 커넥터 사용시 태스크를 수행할 클래스를 지정한다.
4) taskConfigs(int maxTasks): 태스크 개수가 여러개인 경우 옵션 설정을 다르게 해야할 경우 사용한다.
같은 설정인 경우 위에 start(Map<String, String> props) 의 props를 필드로 소유하여 모든 태스크에 입력해주면 된다.
5) config() : 커넥터가 사용할 설정값에 대한 정보를 받는다. 이름, 기본값, 중요도, 설명 등을 정의한다.
책에서는 Config 설정을 초기화하는 클래스를 따로 정의 해주었는데
카프카 라이브러리에서 제공하는 AbstractConfig 클래스를 상속받아 구현하였다.
해당 클래스는 메서드를 구현할 필요는 없고 생성자에 대한 정의만 필요하다.
또한 해당 메서드는 CofigDef 클래스를 리턴 받는데,
플루언트 스타일로 옵션들을 넣을 수 있다.
define()메서드는 수많은 파라미터를 받게되는데
[1] name: 설정 파라미터 명
[2] type: 설정의 타입(문자, 숫자, 클래스,리스트 등등)
[3] defaultValue: 설정의 기본값 (설정값 미존재시)
[4] importance: 설정의 중요도 HIGH, MEDIUM, LOW가 존재한다.
HIGH: 입력 설정 필요
MEDIUM: 입력값 없어도 상관없고, 기본값 존재
LOW: 입력값 없어도 되는 설정
[5] documentation: 해당 설정에 대한 설명
6) stop() : 커넥션 종료시 필요한 로직을 넣는다.
2. SourceTask: 소스 애플리케이션 또는 소스 파일로 부터 데이터를 가져와서 토픽을 보내는 역할을 수행한다.
이때 이 Task의 오프셋은 토픽에서 사용하는 오프셋이 아닌 자체적으로 사용하는 오프셋을 사용한다.
SourceTask에서 사용하는 오프셋은 소스 애플리케이션 혹은 소스 파일로부터 어디 까지 읽었는지
(오프셋 스토리지에) 저장한다.
그래서 소스 태스크 클래스를 작성할때는 소스 파일로부터 읽고 토픽으로 보낸 지점을 기록하는것을 작성하여한다.
만약 기록하지 않는다면 데이터를 중복으로 보내질 수 있다.
다음과 같은 메서드들이 존재한다.
1) version() : 태스크의 버전을 지정한다. - 보통 커넥터의 버전과 동일한 버전으로 작성한다.
2) start(Map<String, String> props) : 태스크 시작시 필요한 로직을 작성한다. 데이터 처리에 필요한 모든 리소스를
여기서 초기화 한다. JDBC 커넥션을 생성한다면 여기서 시작할 수 있다.
3) poll() : 소스로부터 데이터를 읽어오는 로직을 작성한다. 토픽으로 보낼 데이터를 SourceRecord로 정의한다.
4) stop(): 태스트 종료시 필요한 로직을 작성한다. (JDBC 커넥션을 종료하는 로직등)
일부 코드는 다음과 같다.
* 파일로부터 오프셋 위치를 가져오고 필드에 저장
SimpleFileSourceConnectorConfig config = new SimpleFileSourceConnectorConfig(props);
file = config.getString(SimpleFileSourceConnectorConfig.DIR_FILE_NAME);
filenamePartition = Collections.singletonMap(FILENAME_FILED, file);
offset = context.offsetStorageReader().offset(filenamePartition);
position = findPosition(offset);
private long findPosition(Map<String, Object> offset) {
if (Objects.nonNull(offset)) {
Object lastReadFileOffset = offset.get(POSITION_FILED);
if (Objects.nonNull(lastReadFileOffset)) {
return (long) lastReadFileOffset;
}
return position;
}
return 0;
}
* 파일로부터 데이터를 읽어오고 토픽에 전달
List<String> lines = getLines(position);
if (!lines.isEmpty()) {
lines.forEach(line -> {
Map<String, Long> sourceOffset = Collections.singletonMap(POSITION_FILED, ++position);
SourceRecord sourceRecord = new SourceRecord(filenamePartition, sourceOffset, topic, Schema.STRING_SCHEMA, line);
recordList.add(sourceRecord);
});
}
※ 배포시 주의점
사용자가 직접 작성한 클래스 뿐만 아니라 참조하는 라이브러리도 함께 빌드하여 jar로 압축해야한다.
참조했던 의존성들을 같이 빌드하지 않게되면 커넥터 실행시 다른 의존성을들 찾지 못하는 ClassNotFoundException
이 발생할 수 있게된다.
그래서 fat jar 형태로 배포해야한다.
별도의 작업을 만들거나
task customFatJar(type: Jar) {
archiveBaseName = 'all-in-one-jar'
duplicatesStrategy = DuplicatesStrategy.EXCLUDE
from { configurations.runtimeClasspath.collect { it.isDirectory() ? it : zipTree(it) } }
with jar
}
jar 작업 수행시 추가할 수 있다.
jar {
from {
configurations.runtimeClasspath.collect { it.isDirectory() ? it : zipTree(it) }
}
}
'JAVA공부 > JAVA' 카테고리의 다른 글
Integer 동일 비교 (4) | 2024.10.07 |
---|---|
MANIFEST.MF (1) | 2024.02.05 |
Apache Kafka - 부분 내용 정리 ② (1) | 2024.01.15 |
Java 8 ~ 17 주요 변경점 정리하기 (0) | 2023.09.17 |
Java 성능 모니터링 ②-① 본론 (VisualVM 분석 및 연동) (0) | 2023.09.13 |