Skip to content

Instantly share code, notes, and snippets.

View ova2's full-sized avatar

Oleg Varaksin ova2

View GitHub Profile
@ova2
ova2 / rxjs-events.ts
Last active February 21, 2024 06:03
import 'rxjs/add/observable/fromEvent';
import 'rxjs/add/operator/do';
import 'rxjs/add/operator/filter';
import 'rxjs/add/operator/switchMap'
import 'rxjs/add/operator/share';
import 'rxjs/add/operator/takeUntil';
export class CanvasInteractionManager {
private destroy$ = new Subject<void>();
@Service
@Slf4j
@RequiredArgsConstructor
public class SspDefinitionenStore implements SspDefinitionConsumer {
private CacheMono<VersionedId, SspDefinition, SspDefinition> sspDefinitionCache;
private FluxSink<SspDefinition> sspDefinitionSink;
@PostConstruct
public void initialize() {
@Service
@Slf4j
@RequiredArgsConstructor
public class TopologyRepository {
private final CacheMono<TopologyRef, TopologyDto, TopologyDto> cache;
private final TopologyLoader topologyLoader;
private final TopologyCreator topologyCreator;
@Autowired
/**
* Gets cached values as Java Stream. Returned stream is not sorted.
*/
public Stream<OVALUE> getValues() {
final Lock readLock = lock.readLock();
readLock.lock();
try {
return cache.values().stream().flatMap(cachedValue -> cachedValue.getValue().stream());
} finally {
readLock.unlock();
/**
* Finds a value by key in an in-memory cache or load it from a remote source.
* The loaded value will be cached.
*/
public Mono<OVALUE> lookup(KEY key) {
return Mono.defer(() -> getValueAsMono(key)
.switchIfEmpty(Mono.defer(() -> onCacheMissResume(key)))
);
}
private static class CacheMonoValue<VALUE> {
private Mono<VALUE> mono;
private Signal<VALUE> signal;
CacheMonoValue(Mono<VALUE> mono) {
this.mono = mono;
}
CacheMonoValue(Signal<VALUE> signal) {
@Slf4j
public class CacheMono<KEY, IVALUE, OVALUE> {
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final Map<KEY, CacheMonoValue<OVALUE>> cache = new HashMap<>();
/**
* External value supplier which should be provided if "valuePublisher" with "keyExtractor"
* are not set
*/
Flux<String> cars = Flux.just("Fiat", "Audi", "BMW");
Flux<String> years = Flux.just("2009", "2018", "2015");
Flux.zip(cars, years)
.map(t -> t.getT1() + " " + t.getT2())
.subscribe(System.out::println);
// Output
Fiat 2009
Audi 2018
String result = CompletableFuture.supplyAsync(() -> "1 + 2 = " + (1 + 2)).get();
System.out.printl(result);
// Output
1 + 2 = 3
CompletableFuture<Void> cf = CompletableFuture.supplyAsync(() -> 1 + 2)
.thenApply(i -> "1 + 2 = " + i)
.thenAccept(s -> System.out.println("Computation result: " + s))
.thenRun(() -> System.out.println("Finished"));
cf.get();
// Output
Computation result: 1 + 2 = 3
Finished