Skip to content

Instantly share code, notes, and snippets.

@migue
Created October 13, 2014 21:28
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save migue/ce4cdb0925ac2eb7266d to your computer and use it in GitHub Desktop.
Small example of how you can send create/update rating's events to a Kafka broker
@Component(
immediate = true,
service = ModelListener.class
)
public class RatingsEntryModelListener extends BaseModelListener<RatingsEntry> {
@Override
public void onAfterCreate(RatingsEntry model)
throws ModelListenerException {
super.onAfterUpdate(model);
publishMessage(model);
}
@Override
public void onAfterUpdate(RatingsEntry model) throws ModelListenerException {
super.onAfterUpdate(model);
publishMessage(model);
}
protected Message buildMessage(RatingsEntry ratingsEntry) {
Message message = new Message();
message.put("topic", _ENTITY_EVENT);
message.setPayload(buildPayload(ratingsEntry));
return message;
}
protected String buildPayload(RatingsEntry ratingsEntry) {
StringBundler sb = new StringBundler(6);
sb.append(ratingsEntry.getCompanyId());
sb.append(StringPool.SEMICOLON);
sb.append(ratingsEntry.getUserId());
sb.append(StringPool.SEMICOLON);
sb.append(ratingsEntry.getEntryId());
sb.append(StringPool.SEMICOLON);
sb.append(ratingsEntry.getClassNameId());
sb.append(StringPool.SEMICOLON);
sb.append(ratingsEntry.getCreateDate());
return sb.toString();
}
protected void publishMessage(RatingsEntry ratingsEntry) {
_messageBus.sendMessage(
"kafka_destination", buildMessage(ratingsEntry));
}
@Reference
protected void setMessageBus(MessageBus messageBus) {
_messageBus = messageBus;
}
protected void unsetMessageBus(MessageBus messageBus) {
_messageBus = messageBus;
}
private static final String _ENTITY_EVENT = "ratings_entity_store_event";
private MessageBus _messageBus;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment