Blog

트러블 슈팅 : Spring Kafka를 사용하여 메시지를 송수신하는 과정에서 org.springframework.kafka.KafkaException: Seek to current after exception 오류가 발생

tag
트러블슈팅
날짜
2023/10/28
생성 일시
2023/10/29 07:12
작성자

문제 상황

Spring Kafka를 사용하여 메시지를 송수신하는 과정에서 org.springframework.kafka.KafkaException: Seek to current after exception 오류가 발생하였습니다. 이 오류는 Kafka 컨슈머가 메시지를 정상적으로 처리하지 못했을 때 발생합니다.

문제 상황 분석

오류의 원인은 프로듀서와 컨슈머 간의 데이터 직렬화 및 역직렬화 설정이 서로 불일치하였기 때문입니다. 프로듀서는 JsonSerializer를 사용하여 객체를 JSON 문자열로 변환하여 메시지를 전송하였으나, 컨슈머는 StringDeserializer를 사용하여 메시지를 문자열로만 변환하려고 시도하였습니다. 이로 인해 데이터 타입이 불일치하게 되어 메시지 처리 중 예외가 발생하였습니다.

해결 방법

1.
직렬화 설정 일치 프로듀서와 컨슈머 모두 StringSerializerStringDeserializer를 사용하도록 설정을 변경하여, 데이터의 직렬화 및 역직렬화 방식을 일치시켰습니다.
2.
예외 처리 @KafkaListener 어노테이션이 적용된 메서드 내에서 예외가 발생할 경우 이를 적절히 처리하여, 오류 발생 시에도 시스템이 안정적으로 동작하도록 하였습니다.

코드 변경

아래의 코드에서는 메시지를 수신한 후 JSON 문자열을 AdminUserManagemetKafkaDto 객체로 변환하여 처리하고 있습니다. 오류가 발생하지 않도록 직렬화 및 역직렬화 설정을 일치시켰습니다.
@KafkaListener(topics = "bookDonationEventApplyInput", groupId = "event-apply-Input-consumer-group") public void AdminUserManagementConsume(String message) throws JsonProcessingException { System.out.println("Received Message in group 'test-consumer-group1': " + message); ObjectMapper objectMapper = new ObjectMapper(); AdminUserManagemetKafkaDto kafkaDto = objectMapper.readValue(message, AdminUserManagemetKafkaDto.class); Page<User> users = userService.findUsersByUsernameAndRoleV1(kafkaDto.getUserName(), kafkaDto.getUserRole() , PageRequest.of(kafkaDto.getPage(), kafkaDto.getPageSize())); List<UserResponseDto> userResponseDtos = users.stream().map(UserResponseDto::new).toList(); userResponseDtos.forEach(System.out::println); String jsonString = objectMapper.writeValueAsString(userResponseDtos); producer.sendMessage("bookDonationEventApplyOutput", jsonString); }
Java
복사

해결

설정을 일치시킨 후 Kafka를 통한 메시지 송수신이 정상적으로 이루어졌으며, Seek to current after exception 오류가 발생하지 않았습니다. 또한, 프론트쪽 서버의 컨슈머에서도 메시지를 정상적으로 전달 받았다는 것을 확인할 수 있었습니다.

정리

Kafka를 사용할 때 설정 불일치는 메시지 처리 실패로 이어질 수 있으므로 주의 깊게 확인하고 일치시켜야 한다는 것을 알 수 있었습니다.