Skip to content

Instantly share code, notes, and snippets.

@aoudiamoncef
Last active February 25, 2024 23:31
Show Gist options
  • Save aoudiamoncef/92846f1a08ece240a387b291bcc15889 to your computer and use it in GitHub Desktop.
Save aoudiamoncef/92846f1a08ece240a387b291bcc15889 to your computer and use it in GitHub Desktop.
Spring Boot with Reactor Disposable, Disposable.Composite and Disposable.Swap

Spring Boot with Reactor Disposable, Disposable.Composite and Disposable.Swap

Reactor Disposable provides a mechanism for managing resources, subscriptions, or actions in a reactive application. There are three main types of Disposable: Disposable, Disposable.Composite, and Disposable.Swap.

1. Disposable

Definition: Represents a single resource that can be disposed of when it's no longer needed, typically used for cleanup purposes.

Real-world usage:

  • In a messaging application, you might use Disposable to manage a subscription to a message queue. When a user subscribes to a topic or channel, you create a disposable subscription. When the user unsubscribes or logs out, you can call dispose() on the subscription to release the resources and stop receiving messages.

  • In a file processing system, you could use Disposable to manage file streams. When a file is opened for processing, you create a disposable stream. When the processing is complete or an error occurs, you can call dispose() to close the stream and release system resources.

2. Disposable.Composite

Definition: Represents a collection of disposables that can be managed as a single unit, allowing for the disposal of multiple resources at once.

Real-world usage:

  • In a financial trading application, you might have multiple subscriptions to market data feeds, trade execution services, and risk management systems. You could use a Disposable.Composite to manage all of these subscriptions together. When the trading system is shut down or no longer needed, you can dispose of the composite to release all the associated resources.

  • In an IoT (Internet of Things) system, you might have multiple sensor readings or device connections that need to be managed. You could use a Disposable.Composite to manage all of these connections together. When the IoT device is turned off or removed from the network, you can dispose of the composite to release all the associated connections.

3. Disposable.Swap

Definition: Allows for the dynamic replacement of one disposable with another, enabling seamless transitions between different resources or actions.

Real-world usage:

  • In a chat application, you might use Disposable.Swap to manage the connection to a chat server. When a user switches chat rooms or channels, you can replace the current chat connection with a new one using Disposable.Swap. This allows users to seamlessly switch between different chat rooms without interrupting their conversations.

  • In a gaming server, you could use Disposable.Swap to manage player sessions. When a player logs in or joins a game, you create a disposable session. When the player logs out or leaves the game, you can replace the current session with a new one using Disposable.Swap. This allows for smooth transitions between player sessions without affecting the overall game state.

Disposable.Swap vs switchMap

  • switchMap: Automatically switches between Publishers, discarding previous ones, without user intervention for cancellation handling.
  • Disposable.Swap: Allows manual cancellation handling for seamless replacement of disposables, unlike switchMap.

Example Usage with Spring Boot

The UserService class provided in this gist demonstrates the usage of Disposable.Swap from Project Reactor. This class is a service for searching and log the users asynchronously and managing the lifecycle of search operations.

Usage Scenario

When a new search operation is initiated, the searchUsers(String query) method is called with a search query. Internally, this method performs the search asynchronously using the provided UserRepository. The result of the search is logged, and any errors encountered during the search are handled gracefully.

Disposable.Swap

The Disposable.Swap instance named disposableSwap is used to manage the lifecycle of search operations. Each time a new search is initiated, the previous search operation's Disposable is replaced with the new one atomically. This ensures that resources associated with the previous search are properly disposed of before starting a new search.

Cleanup on Service Destruction

To clean up resources when the service is destroyed, the cleanUp() method is annotated with @PreDestroy. This method disposes of the Disposable.Swap instance if it exists and is not already disposed of, ensuring proper cleanup of resources associated with search operations.

References

package com.maoudia;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
/**
* A fake implementation of UserRepository for demonstration purposes.
*/
@Component
public class FakeUserRepository implements UserRepository {
private static final Random random = new Random();
/**
* Searches for users by username.
*
* @param username The username to search for.
* @return A Flux-emitting User objects that match the provided username.
*/
@Override
public Flux<User> searchByUsername(String username) {
// Fake implementation of user search by username
List<User> users = Arrays.asList(
new User("user1", "John", "Doe", 30),
new User("user2", "Jane", "Smith", 25),
new User("user3", "Alice", "Johnson", 35),
new User("user4", "Michael", "Brown", 40),
new User("user5", "Emma", "Wilson", 28)
);
// Introduce random latency before emitting each user
return Flux.fromIterable(users)
.filter(user -> user.username().startsWith(username)) // Filter users whose usernames start with the provided search term
.delayElements(Duration.ofMillis(random.nextInt(1000) + 1), Schedulers.boundedElastic()); // Random delay up to 1 second (adjust as needed)
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.2.2</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.maoudia</groupId>
<artifactId>app</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>maoudia-app</name>
<description>MAOUDIA APP</description>
<properties>
<maven.compiler.source>21</maven.compiler.source>
<maven.compiler.target>21</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
package com.maoudia;
/**
* Represents a user record with basic information.
*
* @param username The username of the user.
* @param firstName The first name of the user.
* @param lastName The last name of the user.
* @param age The age of the user.
*/
public record User(
String username,
String firstName,
String lastName,
int age) {}
package com.maoudia;
import reactor.core.publisher.Flux;
/**
* Defines methods for searching users by username.
*/
public interface UserRepository {
/**
* Searches for users by username.
*
* @param username The username to search for.
* @return A Flux-emitting User objects that match the provided username.
*/
Flux<User> searchByUsername(String username);
}
package com.maoudia;
import jakarta.annotation.PreDestroy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.scheduler.Schedulers;
import java.util.UUID;
/**
* Service class for managing user-related operations.
*/
@Service
public class UserService {
private static final Logger LOGGER = LoggerFactory.getLogger(UserService.class);
private final UserRepository userRepository;
private final Disposable.Swap disposableSwap;
/**
* Constructs a UserService with the provided UserRepository.
*
* @param userRepository The repository for user data.
*/
public UserService(UserRepository userRepository) {
this.userRepository = userRepository;
this.disposableSwap = Disposables.swap();
}
/**
* Searches for users asynchronously by the given query.
*
* @param query The search query.
* @return A Disposable representing the ongoing search operation.
*/
public Disposable searchUsers(String query) {
// Perform the new search asynchronously and log the results
Disposable nextDisposable = this.userRepository.searchByUsername(query)
.doOnCancel(() -> LOGGER.warn("User search for '{}' cancelled", query))
.doOnSubscribe(s -> LOGGER.info("User search for '{}' subscribed: {}", query, UUID.randomUUID()))
.subscribeOn(Schedulers.boundedElastic())
.subscribe(
user -> LOGGER.info("User search '{}' found: {}", query, user.username()),
error -> LOGGER.error("Error during User search for '{}'", query, error),
() -> LOGGER.info("User search for '{}' completed", query)
);
// Atomically set the next Disposable on this container and dispose the previous one (if any).
disposableSwap.update(nextDisposable);
return disposableSwap;
}
/**
* Cleans up resources when the service is destroyed.
*/
@PreDestroy
public void cleanUp() {
if (!this.disposableSwap.isDisposed()) {
this.disposableSwap.dispose();
LOGGER.info("Disposed of disposableSwap");
}
}
}
package com.maoudia;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import reactor.core.Disposable;
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
class UserServiceTest {
@Autowired
private UserService userService;
@Test
@DisplayName("Single search for users by username")
void singleSearchUsersByUsername() {
// When
Disposable disposable = userService.searchUsers("user1");
// Then
Assertions.assertThat(disposable)
.isNotNull()
.isInstanceOf(Disposable.Swap.class);
}
@Test
@DisplayName("Multiple searches for users by username")
void multipleSearchUsersByUsername() {
// When
Disposable disposable1 = userService.searchUsers("user5");
Assertions.assertThat(disposable1).isNotNull()
.isInstanceOf(Disposable.Swap.class);
Disposable.Swap disposableSwap1 = (Disposable.Swap) disposable1;
disposable1 = disposableSwap1.get();
Disposable disposable2 = userService.searchUsers("user3");
Assertions.assertThat(disposable2).isNotNull()
.isInstanceOf(Disposable.Swap.class);
Disposable.Swap disposableSwap2 = (Disposable.Swap) disposable2;
disposable2 = disposableSwap2.get();
Disposable disposable3 = userService.searchUsers("user");
// Then
Assertions.assertThat(disposable1.isDisposed()).isTrue();
Assertions.assertThat(disposable2.isDisposed()).isTrue();
Assertions.assertThat(disposable3).isNotNull()
.isInstanceOf(Disposable.Swap.class);
}
@Test
@DisplayName("Clean up resources")
void cleanUpResources() {
// Given
Disposable disposable = userService.searchUsers("user1");
// When
userService.cleanUp();
// Then
Assertions.assertThat(disposable.isDisposed()).isTrue();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment