catalog-service : build.gradle
// 의존성 추가
implementation 'org.springframework.kafka:spring-kafka'
catalog-service : messagequeue.KafkaConsumerConfig
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
// Consumer 설정
@Bean
public ConsumerFactory<String,String> consumerFactory(){
Map<String,Object> properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupId");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(properties);
}
// Listener 등록
@Bean
public ConcurrentKafkaListenerContainerFactory<String,String> kafkaListenerContainerFactory(){
ConcurrentKafkaListenerContainerFactory<String,String> kafkaListenerContainerFactory
= new ConcurrentKafkaListenerContainerFactory<>();
// 위에 설정한 Consumer 정보를 Listener에 등록
kafkaListenerContainerFactory.setConsumerFactory(consumerFactory());
return kafkaListenerContainerFactory;
}
}
catalog-service : messagequeue.KafkaConsumer
@Slf4j
@Service
@RequiredArgsConstructor
public class KafkaConsumer {
private final CatalogRepository catalogRepository;
// Topic = example-catalog-topic
@KafkaListener(topics = "example-catalog-topic")
public void updateQty(String kafkaMessage){
log.info("Kafka Message: ->" + kafkaMessage);
Map<Object, Object> map = new HashMap<>();
ObjectMapper mapper = new ObjectMapper();
try {
map = mapper.readValue(kafkaMessage, new TypeReference<Map<Object, Object>>() {});
}catch (JsonProcessingException ex){
ex.printStackTrace();
}
CatalogEntity entity = catalogRepository.findByProductId((String) map.get("productId"));
if(entity != null){
entity.setStock(entity.getStock() - (Integer) map.get("qty"));
catalogRepository.save(entity);
}
}
}
order-service : build.gradle