// <div class="search-result" *ngFor="let result of results$ | async">
// ...
// </div>
class SearchBarComponent implements OnInit {
results$: Observable
subject = new Subject()
ngOnInit() {
this.results$ = this.subject.pipe(
debounceTime(1000),
map(text => this.$http.get("/api/search?q=" + text))
)
}
search($evt) {
this.subject.next($evt.target.value)
}
}
fromEvent(keyPressElement, 'keypress').pipe(
// Input handlers to take input value for searching
switchMap(value => {
return from(fetchCakesByName(value)).pipe(
// Only make an API request for searching after 0.5s since user stopped pressing a key
debounce(0.5),
// Retry 3 times if an API request fails
retry(3),
// Cancel the search when press Esc or clear input
takeUntil(value => value === 'Esc' || value === ''),
// Handling errors by showing an error alerts
catchError(error => alert('Error in getting cakes'))
)
}),
)
With this implementation we dont need to keep state of all subscription for cleaning
export class RXclean implements OnDestroy, OnInit {
private destroy$ = new Subject();
constructor(private $fetchA, $fetchB) {}
ngOnInit() {
this.$fetchA.getAll().pipe(
map(elems => ... ),
takeUntil(this.destroy$)
).subscribe(...);
this.$fetchB.sync().pipe(
map(elems => this.parse(elems)),
takeUntil(this.destroy$)
).subscribe(...);
}
ngOnDestroy() {
this.destroy$.next();
this.destroy$.complete();
}
}
Let's say, you want to listen to drag - events (all mouse-move events while having mouse clicked). You need a state mousedown.
let mouseDown = false;
document.addEventListener('mousedown', (ev) => mouseDown = true);
document.addEventListener('mouseup', (ev) => mouseDown = false);
document.addEventListener('mousemove', (ev) => {
if(mouseDown) {
// do something with it
}
}
Here is the rxjs - way
let down$ = fromEvent(document, 'mousedown')
let up$ = fromEvent(document, 'mouseup')
let move$ = fromEvent(document, 'mousemove')
let drag$ = down$.pipe(
switchMap(start => move$.pipe(
map(move => move.clientX - start.clientX),
takeUntil(up$)
)
)
)
It emits each value of the input stream with a time period delay
Delay executing a function. It will reduce the notifications of an event that fires multiple times.
- It starts by emitting the first values of the input stream
- Then, it limits the rate of values to at most one per time period
During lapse of time, bunch a series of sequential calls to a function into a single call to that function. It ensures that one notification is made for an event that fires multiple times (in a lapse of time). demo
When all observables complete, emit an array containing the last emitted value from each.
Subscribe to all inner observables, waiting for each to emit a value. Once this occurs, all values with the corresponding index will be emitted. This will continue until at least one inner observable completes.
When any observable emits a value, emit the latest value from each.