른록노트

[Kfaka/Zookeeper] kafka connect 실행 - 2 본문

Server,OS/[Linux]

[Kfaka/Zookeeper] kafka connect 실행 - 2

른록 2020. 9. 29. 00:55

@ 시작하기전

Window PC에서 VMware로 Linux서버 3대로 클러스터 구성할 계획입니다.

서버사양

OS : CentOS7.4

CPU : 3 core

Memory : 6 GB

주키퍼 주키퍼 버전 : zookeeper-3.5.8

설치할 카프카 버전 :kafka 2.13-2.6.0

이전글 (llnote.tistory.com/685)

 

@ 목표

(서버 3대에서 kafka connector 구동)

FileStreamSourceConnector를 사용해 Source Connector를 만들어서 topic에 데이터 저장하는 connector 생성하기

FileStreamSinkConnector를 사용해 Sink Connector를 만들어서 파일로 내보내는 connector 생성하기

* Source Connector - 외부에서 카프카로 데이터 전송하는 컨넥터

* Sink Connector - 카프카에서 밖으로 데이터 전송하는 컨넥터

 

@ 방법

(kafka.apache.org/documentation/#connect)

(docs.confluent.io/5.0.0/connect/userguide.html)

(sup2is.github.io/2020/06/08/kafka-connect-example.html)

 

[모든서버]

1. kafka connect 설정 변경

> vim config/connect-distributed.properties

bootstrap.servers=server-1:9092,server-2:9092,server-3:9092
group.id=kafka-connect-cluster
offset.storage.topic=connect-offsets
offset.storage.replication.factor=3

config.storage.topic=connect-configs
config.storage.replication.factor=3

status.storage.topic=connect-status
status.storage.replication.factor=3
#kafka connect 관련 토픽


[서버1] client.id=cli-1 
[서버1] rest.advertised.host.name=server-1
[서버1] rest.advertised.port=8083 
[서버2] client.id=cli-2 
[서버2] rest.advertised.host.name=server-2 
[서버2] rest.advertised.port=8083 
[서버3] client.id=cli-3 
[서버3] rest.advertised.host.name=server-3 
[서버3] rest.advertised.port=8083 

2. connect-distributed 실행 (다중모드)

> bin/connect-distributed.sh config/connect-distributed.properties 

 

3. 다른 터미널을 켜서 connect-distributed 확인

> curl localhost:8083

 

4. FileStreamSourceConnector를 사용해 Source Connector를 만들어서 topic에 데이터 저장하는 connector 생성하기

* FileStreamSourceConnector는 소스상에서 task를 한개밖에 사용하지 않아서 task가 생성된 서버를 확인 후 해당 서버에서 5번 부터 진행

 

> curl -X POST -H "Content-Type: application/json" --data '{"name": "local-file-source", "config": {"connector.class":"FileStreamSource", "file":"/root/test.txt", "topic":"connect-test", "tasks }}' http://localhost:8083/connectors

[task 생성된 서버]

5. /root/test.txt 파일 생성

> echo test1 > /root/test.txt

 

6. FileStreamSinkConnector를 사용해 Sink Connector를 만들어서 파일로 내보내는 connector 생성하기

echo '{
  "name" : "my-first-sink",
  "config" : {
    "connector.class" : "org.apache.kafka.connect.file.FileStreamSinkConnector",
    "file" : "/root/test_result.txt",
    "topics" : "connect-test",
	"tasks.max" : 1
  }
}
' | curl -X POST -d @- http://localhost:8083/connectors --header "content-Type:application/json"

 

7. 생성된 커넥터와 task 확인

curl -s "http://localhost:8083/connectors?expand=info&expand=status"

 

8. 새로운 터미널을 켜서 Sink Connector가 잘 동작하는지 확인하기 위해 모니터용

> tail -f /root/result_test.txt

 

9. /root/test.txt 파일에 한줄씨 추가하며 8번 내용 확인하기

> echo test2 >> /root/test.txt

 

10. 참고 REST API

> curl localhost:8083/connector-plugins #사용가능한 플러그인
> curl -X DELETE http://localhost:8083/connectors/local-file-source #커넥터 삭제
> curl -X DELETE http://localhost:8083/connectors/my-first-sink #커넥터 삭제
> curl localhost:8083/connectors # 커넥터 확인

 

@ 마치며

카프카 외에도 여러가지 프로세스를 실행하니 서버 메모리가 부족한것같아 4GB -> 6GB로 변경하였습니다.

connector의 task는 connector 클래스의 taskConfigs 함수에서 정해줍니다.

반응형
Comments