Skip to content

Instantly share code, notes, and snippets.

@fpillet
Last active August 29, 2015 14:15
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save fpillet/6ccd2c79a0eb3ebae796 to your computer and use it in GitHub Desktop.
Save fpillet/6ccd2c79a0eb3ebae796 to your computer and use it in GitHub Desktop.
Two RACSignal operations I use that are not in the main ReactiveCocoa distribution
//
// Created by Florent Pillet on 30/01/15.
//
#import <Foundation/Foundation.h>
@interface RACSignal (FPOperations)
/// Delivers the receiver's latest `next`s with a minimum of `interval`
/// seconds between two values. Of `next` values produced by the receiver
/// in a period of `interval` seconds, only the last one is kept and sent
/// by the return signal; intermediate values are discarded.
///
/// interval - The interval in which values are grouped into one buffer.
/// scheduler - The scheduler upon which the returned signal will deliver its
/// values. This must not be nil or +[RACScheduler
/// immediateScheduler].
///
/// Returns a signal which sends the latest value at (at least) each interval on `scheduler`.
/// When the receiver completes, any unsent latest value will be sent immediately before
/// the signal completes.
- (RACSignal *)sampleAtInterval:(NSTimeInterval)interval onScheduler:(RACScheduler *)scheduler;
/// Resubscribes to the receiver when it completes, after waiting for the specified delay.
/// This is the equivalent of the standard -repeat operation, with the addition of a delay to
/// avoid fast cycles for signals that complete quickly.
///
/// delay - The delay after which the receiver is resubscribed to, on the scheduler that is
/// current at the time the receiver completes / errors.
///
/// The returned signal will pass all `next' to its subscribers. On `error' it will error too,
/// effectively breaking the re-subscription cycle.
- (RACSignal *)repeatAfter:(NSTimeInterval)delay;
@end
//
// Created by Florent Pillet on 30/01/15.
//
#import "RACSignal+FPOperations.h"
@implementation RACSignal (FPOperations)
// Code extracted from ReactiveCococa and modified to suit my needs
// Subscribes to the given signal with the given blocks.
//
// If the signal errors or completes, the corresponding block is invoked. If the
// disposable passed to the block is _not_ disposed, then the signal is
// subscribed to again.
static RACDisposable *subscribeForever (RACSignal *signal, NSTimeInterval delay, void (^next)(id), void (^error)(NSError *, RACDisposable *), void (^completed)(RACDisposable *)) {
next = [next copy];
error = [error copy];
completed = [completed copy];
RACCompoundDisposable *compoundDisposable = [RACCompoundDisposable compoundDisposable];
RACSchedulerRecursiveBlock recursiveBlock = ^(void (^recurse)(void)) {
RACCompoundDisposable *selfDisposable = [RACCompoundDisposable compoundDisposable];
[compoundDisposable addDisposable:selfDisposable];
__weak RACDisposable *weakSelfDisposable = selfDisposable;
RACDisposable *subscriptionDisposable = [signal subscribeNext:next error:^(NSError *e) {
@autoreleasepool {
error(e, compoundDisposable);
[compoundDisposable removeDisposable:weakSelfDisposable];
}
[RACScheduler.currentScheduler afterDelay:delay schedule:recurse];
} completed:^{
@autoreleasepool {
completed(compoundDisposable);
[compoundDisposable removeDisposable:weakSelfDisposable];
}
[RACScheduler.currentScheduler afterDelay:delay schedule:recurse];
}];
[selfDisposable addDisposable:subscriptionDisposable];
};
// Subscribe once immediately, and then use recursive scheduling for any
// further resubscriptions.
recursiveBlock(^{
RACScheduler *recursiveScheduler = RACScheduler.currentScheduler ?: [RACScheduler scheduler];
RACDisposable *schedulingDisposable = [recursiveScheduler scheduleRecursiveBlock:recursiveBlock];
[compoundDisposable addDisposable:schedulingDisposable];
});
return compoundDisposable;
}
- (RACSignal *)sampleAtInterval:(NSTimeInterval)interval onScheduler:(RACScheduler *)scheduler {
NSCParameterAssert(scheduler != nil);
NSCParameterAssert(scheduler != RACScheduler.immediateScheduler);
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
RACSerialDisposable *timerDisposable = [[RACSerialDisposable alloc] init];
__block id latestValue = nil;
void (^sendLatest)() = ^{
@synchronized (timerDisposable) {
[timerDisposable.disposable dispose];
timerDisposable.disposable = nil;
if (latestValue) {
[subscriber sendNext:latestValue];
latestValue = nil;
}
}
};
RACDisposable *selfDisposable = [self subscribeNext:^(id x) {
@synchronized (timerDisposable) {
if (timerDisposable.disposable != nil)
latestValue = x;
else {
[subscriber sendNext:x];
timerDisposable.disposable = [scheduler afterDelay:interval schedule:sendLatest];
}
}
} error:^(NSError *error) {
[subscriber sendError:error];
} completed:^{
sendLatest();
[subscriber sendCompleted];
}];
return [RACDisposable disposableWithBlock:^{
[selfDisposable dispose];
[timerDisposable dispose];
}];
}] setNameWithFormat:@"[%@] -sampleAtInterval: %f onScheduler: %@", self.name, (double)interval, scheduler];
}
- (RACSignal *)repeatAfter:(NSTimeInterval)delay
{
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
return subscribeForever(self, delay,
^(id x) {
[subscriber sendNext:x];
},
^(NSError *error, RACDisposable *disposable) {
[disposable dispose];
[subscriber sendError:error];
},
^(RACDisposable *disposable) {
// Resubscribe.
});
}] setNameWithFormat:@"[%@] -repeatAfter: %f", self.name, delay];
}
@end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment