| @Slf4j | |
| @Component | |
| public class Receiver { | |
| @Autowired | |
| private CustomerOrdersRepository customerOrdersRepository; | |
| @Autowired | |
| private MongoTemplate mongoTemplate; | |
| private CountDownLatch latch = new CountDownLatch(1); | |
| public CountDownLatch getLatch() { | |
| return latch; | |
| } | |
| @KafkaListener(topics = "${spring.kafka.topic.accounts-customer}") | |
| public void receiveCustomerOrder(CustomerOrders customerOrders) { | |
| log.info("received payload='{}'", customerOrders); | |
| latch.countDown(); | |
| customerOrdersRepository.save(customerOrders); | |
| } | |
| @KafkaListener(topics = "${spring.kafka.topic.fulfillment-order}") | |
| public void receiveOrderStatusChangeEvents(OrderStatusChangeEvent orderStatusChangeEvent) { | |
| log.info("received payload='{}'", orderStatusChangeEvent); | |
| latch.countDown(); | |
| Criteria criteria = Criteria.where("orders.guid") | |
| .is(orderStatusChangeEvent.getGuid()); | |
| Query query = Query.query(criteria); | |
| Update update = new Update(); | |
| update.addToSet("orders.$.orderStatusEvents", orderStatusChangeEvent.getOrderStatusEvent()); | |
| mongoTemplate.updateFirst(query, update, "customer.orders"); | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment