1. Kafka 다운로드 및 설치
1.1 다운로드
•
•
다운로드 받은 kafka_2.13-2.8.0.tgz 파일을 압축 해제하여 폴더로 변환합니다.
1.2 파일 구성 확인
•
kafka_2.12-3.6.0 폴더 안에는 주로 사용할 bin과 config 폴더가 있습니다.
◦
bin/windows 폴더: Zookeeper와 Kafka를 실행 및 종료할 수 있는 배치 파일이 있습니다.
◦
config 폴더: Zookeeper와 Kafka 설정을 담고 있는 설정 파일들이 있습니다.
2. Zookeeper 및 Kafka 실행
ZooKeeper란?
Apache ZooKeeper는 분산 시스템에서 설정 관리, 이름 서비스, 동기화, 그리고 그룹 서비스와 같은 서비스를 제공하는 중앙 집중식 서비스입니다.
ZooKeeper는 트리 구조로 데이터를 저장하며, 각 노드는 znode라고 불립니다. ZooKeeper는 높은 가용성을 제공하고, 분산 시스템의 여러 컴포넌트 간에 일관된 뷰를 제공하며, 고가용성과 확장성을 제공합니다.
Kafka
Apache Kafka는 실시간 스트림 처리를 위한 분산 메시지 브로커 시스템입니다.
Kafka는 대용량의 데이터 스트림을 처리하고, 여러 소비자에게 데이터를 분산시킬 수 있습니다.
Kafka는 높은 처리량, 확장성, 내결함성을 제공하며, 다양한 데이터 파이프라인과 스트리밍 애플리케이션에 사용됩니다.
ZooKeeper와 Kafka의 관계
Kafka는 내부적으로 ZooKeeper를 사용하여 클러스터의 메타데이터 관리, 리더 선출, 구성 관리, 그리고 브로커의 살아있음 확인(health checking)과 같은 여러 중요한 작업을 수행합니다.
1.
클러스터 메타데이터 관리: Kafka 클러스터의 모든 브로커는 ZooKeeper에 등록되어 있으며, ZooKeeper는 이러한 브로커들의 상태와 메타데이터 정보를 관리합니다.
2.
리더 선출: Kafka의 파티션에는 리더와 팔로워가 있으며, 모든 읽기 및 쓰기 요청은 리더를 통해 처리됩니다. ZooKeeper는 이 리더를 동적으로 선출하고 관리합니다.
3.
구성 관리: Kafka 클러스터의 구성 변경사항이 있을 때, 이 변경사항을 ZooKeeper에 저장하고 모든 브로커가 일관된 구성을 사용하도록 합니다.
4.
브로커의 살아있음 확인: ZooKeeper는 Kafka 브로커가 살아있는지 확인하기 위한 메커니즘을 제공하며, 브로커가 죽었을 때 다른 브로커에게 알려주어 리밸런싱을 수행할 수 있도록 합니다.
이러한 방식으로, ZooKeeper는 Kafka 클러스터의 안정적이고 일관된 운영을 지원하는 역할을 수행합니다.
2.1 Zookeeper 실행
1.
cmd 창을 열고 Kafka 폴더 위치로 이동합니다.
2.
다음 명령어를 입력하여 Zookeeper를 실행합니다.
bin\\windows\\zookeeper-server-start.bat config\\zookeeper.properties
Plain Text
복사
트러블 슈팅
F:\hanghae\FinalProjects\kafka_2.13-3.6.0>bin\windows\zookeeper-server-start.bat config\zookeeper.properties
입력 줄이 너무 깁니다.
명령 구문이 올바르지 않습니다.
•
> 폴더 길이가 너무 길어서 문제 발생하였다.
F 드라이브 바로 밑에 설정해서 문제를 해결한다.
해결
서버가 정상적으로 부팅 됬음을 확인 할 수 있습니다.
2.2 Kafka 실행
1.
Kafka 폴더 위치에서 새로운 cmd 창을 열어 줍니다.
2.
다음 명령어를 입력하여 Kafka 서버를 실행합니다.
bin\\windows\\kafka-server-start.bat config\\server.properties
Plain Text
복사
3. Kafka Topic 생성 및 확인
1.
다음 명령어를 통해 Kafka Topic을 생성합니다.
bin\\windows\\kafka-topics.bat --create --bootstrap-server localhost:9092 --topic dev-topic
Plain Text
복사
2.
다음 명령어를 통해 생성된 Topic을 확인합니다.
bin\\windows\\kafka-topics.bat --list --bootstrap-server localhost:9092
Plain Text
복사
bin\windows\kafka-topics.bat: Windows 환경에서 Kafka 토픽을 관리하기 위한 스크립트의 경로입니다.
--create: 토픽을 생성하는 옵션입니다.
--bootstrap-server localhost:9092: Kafka 서버의 호스트명과 포트를 지정합니다. 이 예에서는 로컬 컴퓨터의 9092 포트에 실행 중인 Kafka 서버에 연결합니다.
--topic book-donation-topic: 생성할 토픽의 이름을 지정합니다.
Kafka Topic이 정상적으로 생성되면, C:\\tmp\\kafka-logs 위치에 해당 Topic의 폴더가 생성됩니다. 이제 Kafka 환경이 성공적으로 구축되었으며, 데이터 스트리밍 및 메시징 작업을 시작할 수 있습니다.
프로젝트에서 카프카 적용
build.gradle
implementation 'org.springframework.kafka:spring-kafka'
implementation 'org.apache.kafka:kafka-clients'
Plain Text
복사
org.springframework.kafka:spring-kafka와 org.apache.kafka:kafka-clients 두 라이브러리 모두 Apache Kafka를 사용하기 위해 필요한 라이브러리들입니다. 하지만 각각의 라이브러리가 제공하는 기능과 목적이 조금 다릅니다.
1.
org.apache.kafka:kafka-clients
•
이 라이브러리는 Apache Kafka와 직접적으로 통신하기 위한 클라이언트 API를 제공합니다.
•
프로듀서(Producer)와 컨슈머(Consumer) 등 Kafka와 직접적으로 데이터를 주고받기 위한 기본적인 클라이언트 기능들을 포함하고 있습니다.
•
Java 언어로 Kafka를 사용할 때 기본적으로 필요한 라이브러리입니다.
2.
org.springframework.kafka:spring-kafka:
•
이 라이브러리는 Spring Framework 위에서 Kafka를 좀 더 편리하게 사용할 수 있도록 지원하는 기능들을 제공합니다.
•
Spring의 주요 개념인 Inversion of Control (IoC), Dependency Injection (DI), AOP(Aspect-Oriented Programming) 등을 활용하여 Kafka 클라이언트를 쉽게 구성하고 관리할 수 있도록 도와줍니다.
•
예를 들어, Kafka 메시지 리스너를 쉽게 구현하고 관리할 수 있으며, Kafka 트랜잭션 관리, 에러 처리 등을 Spring과 일관된 방식으로 처리할 수 있습니다.
org.apache.kafka:kafka-clients는 Kafka와 직접 통신하기 위한 기본적인 기능들을 제공하며, org.springframework.kafka:spring-kafka는 이러한 Kafka 클라이언트를 Spring Framework 내에서 좀 더 편리하게 사용할 수 있도록 추가적인 기능들과 통합 기능들을 제공합니다.
application.properties 설정
# Kafka
# Consumer 설정
# Kafka 컨슈머가 연결할 브로커의 주소입니다. 여기서는 로컬에서 실행 중인 Kafka 브로커의 주소와 포트를 지정하고 있습니다.
spring.kafka.consumer.bootstrap-servers=localhost:9092
# 컨슈머 그룹의 ID를 설정합니다. 이 ID는 Kafka 클러스터 내에서 이 컨슈머 그룹을 식별하는 데 사용됩니다.
spring.kafka.consumer.group-id=test-consumer-group
# 오프셋이 초기화되어야 하는 상황(예: 처음 시작하는 컨슈머, 또는 오프셋이 더 이상 유효하지 않은 경우)에 사용할 오프셋 초기화 정책을 설정합니다.
# 'earliest'는 토픽의 처음부터 메시지를 읽기 시작하겠다는 것을 의미합니다.
spring.kafka.consumer.auto-offset-reset=earliest
# Kafka로부터 메시지의 키를 역직렬화하는 데 사용할 클래스를 지정합니다. 여기서는 문자열 역직렬화 클래스를 사용하고 있습니다.
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# Kafka로부터 메시지의 값을 역직렬화하는 데 사용할 클래스를 지정합니다. 여기서도 문자열 역직렬화 클래스를 사용하고 있습니다.
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# Producer 설정
# Kafka 프로듀서가 연결할 브로커의 주소입니다. 컨슈머 설정과 마찬가지로 로컬에서 실행 중인 Kafka 브로커의 주소와 포트를 지정하고 있습니다.
spring.kafka.producer.bootstrap-servers=localhost:9092
# Kafka로 메시지를 보낼 때 메시지의 키를 직렬화하는 데 사용할 클래스를 지정합니다. 여기서는 문자열 직렬화 클래스를 사용하고 있습니다.
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
# Kafka로 메시지를 보낼 때 메시지의 값을 직렬화하는 데 사용할 클래스를 지정합니다. 여기서도 문자열 직렬화 클래스를 사용하고 있습니다.
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
Plain Text
복사
카프카 테스트
Kafka Controller
@RestController
@RequiredArgsConstructor
public class KafkaController {
private final KafkaProducerService producer;
@PostMapping("/send")
public void send(@RequestParam("message") String message){
producer.sendMessage("test",message);
}
}
Java
복사
Kafka Consumer Service
@Service
public class KafkaConsumerService {
@KafkaListener(topics = "test", groupId = "test-consumer-group")
public void consume(String message){
System.out.println("Received Message in group 'test-consumer-group': " + message);
}
}
Java
복사
Kafka Producer Service
@Service
@RequiredArgsConstructor
public class KafkaProducerService {
private final KafkaTemplate<String,String> kafkaTemplate;
public void sendMessage(String topic, String message){
kafkaTemplate.send(topic,message);
}
}
Java
복사