Skip to content

Instantly share code, notes, and snippets.

@alexandre-jacquot-ptl
Last active June 28, 2021 11:26
Show Gist options
  • Save alexandre-jacquot-ptl/3b2cedf7fbf667e181190cdec9c6b5d0 to your computer and use it in GitHub Desktop.
Save alexandre-jacquot-ptl/3b2cedf7fbf667e181190cdec9c6b5d0 to your computer and use it in GitHub Desktop.
r2dbc notification service
@Service
@RequiredArgsConstructor
@Slf4j
public class NotificationService {
private final ConnectionFactory connectionFactory;
private final Set<NotificationTopic> watchedTopics = new HashSet<>();
private PostgresqlConnection connection;
private ObjectMapper objectMapper;
/**
* Listen to a postgreSQL topic
* @param topic Topic to which the connection needs to subscribe
* @param clazz class of the notification parameter (used for deserialization)
* @return the notification parameters
*/
public <T> Flux<T> listen(final NotificationTopic topic, final Class<T> clazz) {
// Listen to the topic
if (!watchedTopics.contains(topic)) {
synchronized (watchedTopics) {
if (!watchedTopics.contains(topic)) {
executeListenStatement(topic);
watchedTopics.add(topic);
}
}
}
// Get the notifications
return getConnection().getNotifications()
.filter(notification -> topic.name().equals(notification.getName()) && notification.getParameter() != null)
.handle((notification, sink) -> {
final String json = notification.getParameter();
if (!StringUtils.isBlank(json)) {
try {
sink.next(objectMapper.readValue(json, clazz));
} catch (JsonProcessingException e) {
log.error(String.format("Problem deserializing an instance of [%s] " +
"with the following json: %s ", clazz.getSimpleName(), json), e);
Mono.error(new NotificationDeserializationException(topic, e));
}
}
});
}
/**
* Unlisten from a postgreSQL topic
* @param topic Topic to which the connection needs to unsubscribe
*/
public void unlisten(final NotificationTopic topic) {
if (watchedTopics.contains(topic)) {
synchronized (watchedTopics) {
if (watchedTopics.contains(topic)) {
executeUnlistenStatement(topic);
watchedTopics.remove(topic);
}
}
}
}
@PostConstruct
private void postConstruct() {
this.objectMapper = createObjectMapper();
}
@PreDestroy
private void preDestroy() {
this.getConnection().close().subscribe();
}
/**
* Execute the SQL statement used to listen to a given topic
* @param topic Name of the topic to listen
*/
private void executeListenStatement(final NotificationTopic topic) {
// Topic in upper-case must be surrounded by quotes
getConnection().createStatement(String.format("LISTEN \"%s\"", topic)).execute().subscribe();
}
/**
* Execute the SQL statement used to unlisten from a given topic
* @param topic Name of the topic to unlisten
*/
private void executeUnlistenStatement(final NotificationTopic topic) {
// Topic in upper-case must be surrounded by quotes
getConnection().createStatement(String.format("UNLISTEN \"%s\"", topic)).execute().subscribe();
}
/**
* Get or create a PostgreSQL database connection
* @return the connection created synchronously
*/
private PostgresqlConnection getConnection() {
if (connection == null) {
synchronized (NotificationService.class) {
if (connection == null) {
connection = Mono.from(connectionFactory.create())
.cast(PostgresqlConnection.class)
.block();
}
}
}
return this.connection;
}
/**
* Create an object mapper to convert the json notification
* parameters to entities
* @return the object mapper
*/
private ObjectMapper createObjectMapper() {
return new ObjectMapper()
.registerModule(new JavaTimeModule())
// This strategy is needed to match the DB column names with the entity field names
.setPropertyNamingStrategy(PropertyNamingStrategy.SNAKE_CASE)
// Ignore the missing properties
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment