Last active
June 28, 2021 11:26
-
-
Save alexandre-jacquot-ptl/3b2cedf7fbf667e181190cdec9c6b5d0 to your computer and use it in GitHub Desktop.
r2dbc notification service
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Service | |
@RequiredArgsConstructor | |
@Slf4j | |
public class NotificationService { | |
private final ConnectionFactory connectionFactory; | |
private final Set<NotificationTopic> watchedTopics = new HashSet<>(); | |
private PostgresqlConnection connection; | |
private ObjectMapper objectMapper; | |
/** | |
* Listen to a postgreSQL topic | |
* @param topic Topic to which the connection needs to subscribe | |
* @param clazz class of the notification parameter (used for deserialization) | |
* @return the notification parameters | |
*/ | |
public <T> Flux<T> listen(final NotificationTopic topic, final Class<T> clazz) { | |
// Listen to the topic | |
if (!watchedTopics.contains(topic)) { | |
synchronized (watchedTopics) { | |
if (!watchedTopics.contains(topic)) { | |
executeListenStatement(topic); | |
watchedTopics.add(topic); | |
} | |
} | |
} | |
// Get the notifications | |
return getConnection().getNotifications() | |
.filter(notification -> topic.name().equals(notification.getName()) && notification.getParameter() != null) | |
.handle((notification, sink) -> { | |
final String json = notification.getParameter(); | |
if (!StringUtils.isBlank(json)) { | |
try { | |
sink.next(objectMapper.readValue(json, clazz)); | |
} catch (JsonProcessingException e) { | |
log.error(String.format("Problem deserializing an instance of [%s] " + | |
"with the following json: %s ", clazz.getSimpleName(), json), e); | |
Mono.error(new NotificationDeserializationException(topic, e)); | |
} | |
} | |
}); | |
} | |
/** | |
* Unlisten from a postgreSQL topic | |
* @param topic Topic to which the connection needs to unsubscribe | |
*/ | |
public void unlisten(final NotificationTopic topic) { | |
if (watchedTopics.contains(topic)) { | |
synchronized (watchedTopics) { | |
if (watchedTopics.contains(topic)) { | |
executeUnlistenStatement(topic); | |
watchedTopics.remove(topic); | |
} | |
} | |
} | |
} | |
@PostConstruct | |
private void postConstruct() { | |
this.objectMapper = createObjectMapper(); | |
} | |
@PreDestroy | |
private void preDestroy() { | |
this.getConnection().close().subscribe(); | |
} | |
/** | |
* Execute the SQL statement used to listen to a given topic | |
* @param topic Name of the topic to listen | |
*/ | |
private void executeListenStatement(final NotificationTopic topic) { | |
// Topic in upper-case must be surrounded by quotes | |
getConnection().createStatement(String.format("LISTEN \"%s\"", topic)).execute().subscribe(); | |
} | |
/** | |
* Execute the SQL statement used to unlisten from a given topic | |
* @param topic Name of the topic to unlisten | |
*/ | |
private void executeUnlistenStatement(final NotificationTopic topic) { | |
// Topic in upper-case must be surrounded by quotes | |
getConnection().createStatement(String.format("UNLISTEN \"%s\"", topic)).execute().subscribe(); | |
} | |
/** | |
* Get or create a PostgreSQL database connection | |
* @return the connection created synchronously | |
*/ | |
private PostgresqlConnection getConnection() { | |
if (connection == null) { | |
synchronized (NotificationService.class) { | |
if (connection == null) { | |
connection = Mono.from(connectionFactory.create()) | |
.cast(PostgresqlConnection.class) | |
.block(); | |
} | |
} | |
} | |
return this.connection; | |
} | |
/** | |
* Create an object mapper to convert the json notification | |
* parameters to entities | |
* @return the object mapper | |
*/ | |
private ObjectMapper createObjectMapper() { | |
return new ObjectMapper() | |
.registerModule(new JavaTimeModule()) | |
// This strategy is needed to match the DB column names with the entity field names | |
.setPropertyNamingStrategy(PropertyNamingStrategy.SNAKE_CASE) | |
// Ignore the missing properties | |
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment