Last active
June 15, 2018 16:40
-
-
Save garystafford/cc3c4e55bc291e5435eccdd679d03015 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Slf4j | |
@Component | |
public class Receiver { | |
@Autowired | |
private CustomerOrdersRepository customerOrdersRepository; | |
@Autowired | |
private MongoTemplate mongoTemplate; | |
private CountDownLatch latch = new CountDownLatch(1); | |
public CountDownLatch getLatch() { | |
return latch; | |
} | |
@KafkaListener(topics = "${spring.kafka.topic.accounts-customer}") | |
public void receiveCustomerOrder(CustomerOrders customerOrders) { | |
log.info("received payload='{}'", customerOrders); | |
latch.countDown(); | |
customerOrdersRepository.save(customerOrders); | |
} | |
@KafkaListener(topics = "${spring.kafka.topic.fulfillment-order}") | |
public void receiveOrderStatusChangeEvents(OrderStatusChangeEvent orderStatusChangeEvent) { | |
log.info("received payload='{}'", orderStatusChangeEvent); | |
latch.countDown(); | |
Criteria criteria = Criteria.where("orders.guid") | |
.is(orderStatusChangeEvent.getGuid()); | |
Query query = Query.query(criteria); | |
Update update = new Update(); | |
update.addToSet("orders.$.orderStatusEvents", orderStatusChangeEvent.getOrderStatusEvent()); | |
mongoTemplate.updateFirst(query, update, "customer.orders"); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment