른록노트
[Kfaka/Zookeeper] kafka connect 실행 - 2 본문
@ 시작하기전
Window PC에서 VMware로 Linux서버 3대로 클러스터 구성할 계획입니다.
서버사양
OS : CentOS7.4
CPU : 3 core
Memory : 6 GB
주키퍼 주키퍼 버전 : zookeeper-3.5.8
설치할 카프카 버전 :kafka 2.13-2.6.0
이전글 (llnote.tistory.com/685)
@ 목표
(서버 3대에서 kafka connector 구동)
FileStreamSourceConnector를 사용해 Source Connector를 만들어서 topic에 데이터 저장하는 connector 생성하기
FileStreamSinkConnector를 사용해 Sink Connector를 만들어서 파일로 내보내는 connector 생성하기
* Source Connector - 외부에서 카프카로 데이터 전송하는 컨넥터
* Sink Connector - 카프카에서 밖으로 데이터 전송하는 컨넥터
@ 방법
(kafka.apache.org/documentation/#connect)
(docs.confluent.io/5.0.0/connect/userguide.html)
(sup2is.github.io/2020/06/08/kafka-connect-example.html)
[모든서버]
1. kafka connect 설정 변경
> vim config/connect-distributed.properties
bootstrap.servers=server-1:9092,server-2:9092,server-3:9092
group.id=kafka-connect-cluster
offset.storage.topic=connect-offsets
offset.storage.replication.factor=3
config.storage.topic=connect-configs
config.storage.replication.factor=3
status.storage.topic=connect-status
status.storage.replication.factor=3
#kafka connect 관련 토픽
[서버1] client.id=cli-1
[서버1] rest.advertised.host.name=server-1
[서버1] rest.advertised.port=8083
[서버2] client.id=cli-2
[서버2] rest.advertised.host.name=server-2
[서버2] rest.advertised.port=8083
[서버3] client.id=cli-3
[서버3] rest.advertised.host.name=server-3
[서버3] rest.advertised.port=8083
2. connect-distributed 실행 (다중모드)
> bin/connect-distributed.sh config/connect-distributed.properties
3. 다른 터미널을 켜서 connect-distributed 확인
> curl localhost:8083
4. FileStreamSourceConnector를 사용해 Source Connector를 만들어서 topic에 데이터 저장하는 connector 생성하기
* FileStreamSourceConnector는 소스상에서 task를 한개밖에 사용하지 않아서 task가 생성된 서버를 확인 후 해당 서버에서 5번 부터 진행
> curl -X POST -H "Content-Type: application/json" --data '{"name": "local-file-source", "config": {"connector.class":"FileStreamSource", "file":"/root/test.txt", "topic":"connect-test", "tasks }}' http://localhost:8083/connectors
[task 생성된 서버]
5. /root/test.txt 파일 생성
> echo test1 > /root/test.txt
6. FileStreamSinkConnector를 사용해 Sink Connector를 만들어서 파일로 내보내는 connector 생성하기
echo '{
"name" : "my-first-sink",
"config" : {
"connector.class" : "org.apache.kafka.connect.file.FileStreamSinkConnector",
"file" : "/root/test_result.txt",
"topics" : "connect-test",
"tasks.max" : 1
}
}
' | curl -X POST -d @- http://localhost:8083/connectors --header "content-Type:application/json"
7. 생성된 커넥터와 task 확인
curl -s "http://localhost:8083/connectors?expand=info&expand=status"
8. 새로운 터미널을 켜서 Sink Connector가 잘 동작하는지 확인하기 위해 모니터용
> tail -f /root/result_test.txt
9. /root/test.txt 파일에 한줄씨 추가하며 8번 내용 확인하기
> echo test2 >> /root/test.txt
10. 참고 REST API
> curl localhost:8083/connector-plugins #사용가능한 플러그인
> curl -X DELETE http://localhost:8083/connectors/local-file-source #커넥터 삭제
> curl -X DELETE http://localhost:8083/connectors/my-first-sink #커넥터 삭제
> curl localhost:8083/connectors # 커넥터 확인
@ 마치며
카프카 외에도 여러가지 프로세스를 실행하니 서버 메모리가 부족한것같아 4GB -> 6GB로 변경하였습니다.
connector의 task는 connector 클래스의 taskConfigs 함수에서 정해줍니다.