Spring Boot - kafka producer
1. 스프링부트 프로젝트 생성
2. application.properties
spring.application.name=kafka
# Kafka broker address
spring.kafka.bootstrap-servers=카프카서버주소
# Producer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
Kafka 메시지 키에 대한 직렬화 클래스를 StringSerializer.로 설정합니다. 이는 메시지 키가 문자열로 직렬화됨을 나타냅니다.
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
Kafka 메시지 값에 대한 직렬화기 클래스를 StringSerializer.로 설정합니다. 이는 메시지 값도 문자열로 직렬화됨을 나타냅니다.
3.Configuration
package com.example.kafka;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.producer.key-serializer}")
private String keySerializer;
@Value("${spring.kafka.producer.value-serializer}")
private String valueSerializer;
@Bean
public ProducerFactory<String, byte[]> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, byte[]> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
org.apache.kafka.clients.producer.ProducerConfig
이 클래스는 Kafka 프로듀서 구성 속성에 대한 액세스를 제공합니다.
org.springframework.beans.factory.annotation.Value
이 Spring 어노테이션을 사용하면 애플리케이션 속성 파일에서 값을 삽입할 수 있습니다.
org.springframework.context.annotation.Bean
이 Spring 주석은 Spring 컨테이너가 관리하는 빈을 생성하는 메서드를 표시합니다.
org.springframework.context.annotation.Configuration
이 주석은 이 클래스가 Spring 구성 클래스임을 나타냅니다.
org.springframework.kafka.core.DefaultKafkaProducerFactory
이 클래스는 Kafka 프로듀서를 위한 프로듀서 팩토리를 만드는 데 사용됩니다.
org.springframework.kafka.core.KafkaTemplate
이 클래스는 Kafka 토픽에 메시지를 보내는 편리한 방법을 제공합니다.
@Configuration
public class KafkaConfig {
@Configuration
이 주석은 클래스를 Spring 구성 클래스로 표시하여 빈 정의와 구성 로직을 포함하고 있음을 나타냅니다.
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.producer.key-serializer}")
private String keySerializer;
@Value("${spring.kafka.producer.value-serializer}")
private String valueSerializer;
@Value
어노테이션을 사용하여 application.properties 파일에 정의된 value를 주입받습니다.
@Bean
public ProducerFactory<String, byte[]> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
return new DefaultKafkaProducerFactory<>(config);
}
ProducerFactory는 Kafka 프로듀서를 생성하는 데 사용됩니다.
Map<String, Object> config를 통해 Kafka 프로듀서의 설정을 정의합니다.
부트스트랩 서버, 키 직렬화기, 값 직렬화기를 설정합니다.
DefaultKafkaProducerFactory를 사용해 설정을 기반으로 프로듀서 팩토리를 생성합니다.
@Bean
public KafkaTemplate<String, byte[]> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
KafkaTemplate은 Kafka에 메시지를 전송하는 데 사용되는 클래스입니다.
생성자에 producerFactory()를 전달하여 위에서 정의한 프로듀서 팩토리를 사용합니다.
4.Service
package com.example.kafka;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import org.springframework.kafka.support.SendResult;
import java.util.concurrent.CompletableFuture;
@Service
@Slf4j
public class KafkaStringService {
private final KafkaTemplate<String, String> kafkaTemplate;
@Autowired
public KafkaStringService(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(String topic, String message) {
CompletableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
future.whenComplete((result, ex) -> {
if (ex != null) {
log.error("메시지 전송 실패: {}", ex.getMessage());
} else {
log.info("메시지 전송 성공: {}", result.getRecordMetadata().toString());
}
});
}
}
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import org.springframework.kafka.support.SendResult;
import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;
@Slf4j Lombok 라이브러리에서 주석을 가져옵니다. 이 주석을 사용하면 수동으로 생성하지 않고도 클래스 내에서 logger 객체를 쉽게 사용할 수 있습니다.
import org.springframework.beans.factory.annotation.Autowired;
@AutowiredSpring에서 주석을 가져옵니다. 이 주석은 Spring에 자동으로 이 클래스에 종속성을 주입하라고 알리는 데 사용됩니다.
import org.springframework.kafka.core.KafkaTemplate;
KafkaTemplateSpring Kafka 라이브러리에서 클래스를 가져옵니다. 이 클래스는 Kafka 토픽에 메시지를 보내는 편리한 방법을 제공합니다.
import org.springframework.stereotype.Service;
@Service Spring에서 주석을 가져옵니다. 이 주석은 클래스를 Spring 서비스로 표시하며, 이는 애플리케이션의 다른 부분에 주입될 수 있습니다.
import org.springframework.kafka.support.SendResult;
SendResult Spring Kafka에서 클래스를 가져옵니다. 이 클래스는 성공 또는 실패와 같은 정보를 포함하여 Kafka 토픽에 메시지를 보낸 결과를 보유합니다.
import java.util.concurrent.CompletableFuture;
이것은 CompletableFutureJava의 동시성 라이브러리에서 클래스를 가져옵니다. 이 클래스는 비동기 계산과 그 최종 결과를 나타냅니다.
@Service
public class KafkaStringService {
@Service
어노테이션을 사용하여 Spring의 서비스 컴포넌트로 등록합니다.
private final KafkaTemplate<String, String> kafkaTemplate;
@Autowired
public KafkaStringService(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
KafkaTemplate<String, String> 타입의 필드를 정의하고, 생성자를 통해 의존성 주입을 받습니다.
KafkaTemplate은 이전에 KafkaConfig에서 설정했던 bean으로 문자열 키와 문자열 값을 사용하는 Kafka 프로듀서입니다.
public void sendMessage(String topic, String message) {
CompletableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
future.whenComplete((result, ex) -> {
if (ex != null) {
log.error("메시지 전송 실패: {}", ex.getMessage());
} else {
log.info("메시지 전송 성공: {}", result.getRecordMetadata().toString());
}
});
}
sendMessage 메서드는 kafkaTemplate.send(topic, message)를 호출하여 비동기적으로 메시지를 전송하고, CompletableFuture를 반환받습니다.
whenComplete 메서드를 사용하여 전송 결과를 처리합니다.
예외가 발생하면 에러 로그를 기록합니다.
성공적으로 전송되면 성공 로그를 기록합니다.
5. Controller
package com.example.kafka;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.format.annotation.DateTimeFormat;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.ModelAndView;
import java.time.LocalDateTime;
@RestController
public class MessageController {
private final KafkaStringService kafkaStringService;
@Autowired
public MessageController(KafkaStringService kafkaStringService) {
this.kafkaStringService = kafkaStringService;
}
@GetMapping("/test")
public String test() {
kafkaStringService.sendMessage("test1", LocalDateTime.now().toString());
return "메시지 전송 중...";
}
}
@Service 어노테이션으로 지정한 KafkaStringService를
KafkaStringService 타입으로 정의한 필드에 @Autowired로 생성자를 통해 의존성 주입 합니다.
이 서비스는 메시지를 Kafka로 전송하는 기능을 제공합니다.
http://localhost8080/test 를 브라우저로 호출하면 test1이라는 토픽에 텍스트데이터가 전송됩니다.
참고 링크 :
https://youtu.be/Nh3AZx8D9KY?si=Y-M4S0d1YRlWl4nt
https://youtu.be/KTXeWIHUQnw?si=P9_K32mOhy351gDk
'STUDY > SpringBoot' 카테고리의 다른 글
Spring Boot - grpc server 예제 (0) | 2024.10.23 |
---|---|
Spring Boot - grpc client 바이너리 데이터 전송 (1) | 2024.10.23 |
[Springboot] 22-09-29 삭제·상세·수정 기능 수업 -3 (0) | 2022.09.29 |
[Springboot] 22-09-29 페이징 기능 수업 -2 (0) | 2022.09.29 |
[Springboot] 22-09-28 페이징 기능 수업 -1 (0) | 2022.09.28 |