Orders Microservice 와 Catalogs Microservice 에 Kafka Topic의 적용


데이터 동기화 1 Orders → Catalogs

Catalogs Microservice 수정


catalog-service : build.gradle

// 의존성 추가
implementation 'org.springframework.kafka:spring-kafka'

catalog-service : messagequeue.KafkaConsumerConfig

@EnableKafka
@Configuration
public class KafkaConsumerConfig {
    
    // Consumer 설정
    @Bean
    public ConsumerFactory<String,String> consumerFactory(){
        Map<String,Object> properties = new HashMap<>();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupId");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        
        return new DefaultKafkaConsumerFactory<>(properties);
    }
    
    // Listener 등록 
    @Bean 
    public ConcurrentKafkaListenerContainerFactory<String,String> kafkaListenerContainerFactory(){
        ConcurrentKafkaListenerContainerFactory<String,String> kafkaListenerContainerFactory
                = new ConcurrentKafkaListenerContainerFactory<>();
        // 위에 설정한 Consumer 정보를 Listener에 등록
        kafkaListenerContainerFactory.setConsumerFactory(consumerFactory()); 
        return kafkaListenerContainerFactory;
    } 
}

catalog-service : messagequeue.KafkaConsumer

@Slf4j
@Service
@RequiredArgsConstructor
public class KafkaConsumer {

    private final CatalogRepository catalogRepository;

    // Topic = example-catalog-topic
    @KafkaListener(topics = "example-catalog-topic")
    public void updateQty(String kafkaMessage){
        log.info("Kafka Message: ->" + kafkaMessage);

        Map<Object, Object> map = new HashMap<>();
        ObjectMapper mapper = new ObjectMapper();
        try {
            map = mapper.readValue(kafkaMessage, new TypeReference<Map<Object, Object>>() {});
        }catch (JsonProcessingException ex){
            ex.printStackTrace();
        }

        CatalogEntity entity = catalogRepository.findByProductId((String) map.get("productId"));
        if(entity != null){
            entity.setStock(entity.getStock() - (Integer) map.get("qty"));
            catalogRepository.save(entity);
        }
    }
}

Orders Microservice 수정


order-service : build.gradle