Small example of how you can send create/update rating's events to a Kafka broker
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Component( | |
immediate = true, | |
service = ModelListener.class | |
) | |
public class RatingsEntryModelListener extends BaseModelListener<RatingsEntry> { | |
@Override | |
public void onAfterCreate(RatingsEntry model) | |
throws ModelListenerException { | |
super.onAfterUpdate(model); | |
publishMessage(model); | |
} | |
@Override | |
public void onAfterUpdate(RatingsEntry model) throws ModelListenerException { | |
super.onAfterUpdate(model); | |
publishMessage(model); | |
} | |
protected Message buildMessage(RatingsEntry ratingsEntry) { | |
Message message = new Message(); | |
message.put("topic", _ENTITY_EVENT); | |
message.setPayload(buildPayload(ratingsEntry)); | |
return message; | |
} | |
protected String buildPayload(RatingsEntry ratingsEntry) { | |
StringBundler sb = new StringBundler(6); | |
sb.append(ratingsEntry.getCompanyId()); | |
sb.append(StringPool.SEMICOLON); | |
sb.append(ratingsEntry.getUserId()); | |
sb.append(StringPool.SEMICOLON); | |
sb.append(ratingsEntry.getEntryId()); | |
sb.append(StringPool.SEMICOLON); | |
sb.append(ratingsEntry.getClassNameId()); | |
sb.append(StringPool.SEMICOLON); | |
sb.append(ratingsEntry.getCreateDate()); | |
return sb.toString(); | |
} | |
protected void publishMessage(RatingsEntry ratingsEntry) { | |
_messageBus.sendMessage( | |
"kafka_destination", buildMessage(ratingsEntry)); | |
} | |
@Reference | |
protected void setMessageBus(MessageBus messageBus) { | |
_messageBus = messageBus; | |
} | |
protected void unsetMessageBus(MessageBus messageBus) { | |
_messageBus = messageBus; | |
} | |
private static final String _ENTITY_EVENT = "ratings_entity_store_event"; | |
private MessageBus _messageBus; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment