Skip to content

Instantly share code, notes, and snippets.

@atticoos
Last active February 2, 2017 03:54
Show Gist options
  • Save atticoos/aa5b24aed7ac7193f1760eb98da31df5 to your computer and use it in GitHub Desktop.
Save atticoos/aa5b24aed7ac7193f1760eb98da31df5 to your computer and use it in GitHub Desktop.
//
// AppDelegate.m
// RXTesting
//
// Created by Atticus White on 2/1/17.
// Copyright © 2017 Atticus White. All rights reserved.
//
#import "AppDelegate.h"
#import <ReactiveCocoa/ReactiveCocoa.h>
#import "RACSignal+Buffer.h"
@interface AppDelegate ()
@property (nonatomic) BOOL started;
@end
/**
* Objective: Build a buffer from a signal (source-signal) until another signal (start-signal) has arrived.
* Until the start-signal arrives, the source-signal actions should queue in a buffer.
* Once the start-signal arrives, they should be sent one-by-one to the subscriber.
* Then all actions from the source-signal, after the start-signal has been acquired, should stream as normal.
*
* Example:
*
* SourceSignal StartSignal Buffer Subscriber
* 'A' - ['A'] -
* 'B' - ['A', 'B'] -
* - 1 [] 'A' then 'B'
* 'C' - [] 'C'
* 'D' - [] 'D'
*
*/
@implementation AppDelegate
- (BOOL)application:(UIApplication *)application didFinishLaunchingWithOptions:(NSDictionary *)launchOptions {
self.started = false;
NSNotificationCenter *notificationCenter = [NSNotificationCenter defaultCenter];
RACSignal *hasStarted = [RACObserve(self, started) filter:^BOOL(id value) {
return [value boolValue];
}];
RACSignal *actions = [RACSignal merge:@[
[notificationCenter rac_addObserverForName:@"FOO" object:nil],
[notificationCenter rac_addObserverForName:@"BAR" object:nil],
[notificationCenter rac_addObserverForName:@"HELLO" object:nil],
[notificationCenter rac_addObserverForName:@"WORLD" object:nil]
]];
[[actions enumerableBufferBefore:hasStarted] subscribeNext:^(NSNotification *notification) {
NSLog(@"Received Action : %@", notification.name);
}];
NSLog(@"Dispatching 'FOO'");
[notificationCenter postNotificationName:@"FOO" object:nil];
NSLog(@"Dispatching 'BAR'");
[notificationCenter postNotificationName:@"BAR" object:nil];
[self start];
return YES;
}
- (void)start
{
NSLog(@"Waiting for ReactNative to mount..");
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(5 * NSEC_PER_SEC)), dispatch_get_main_queue(), ^{
NSLog(@"ReactNative mounted");
self.started = true;
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(1 * NSEC_PER_SEC)), dispatch_get_main_queue(), ^{
NSLog(@"Dispatching 'HELLO'");
[[NSNotificationCenter defaultCenter] postNotificationName:@"HELLO" object:nil];
NSLog(@"Dispatching 'WORLD'");
[[NSNotificationCenter defaultCenter] postNotificationName:@"WORLD" object:nil];
});
});
}
@end
//
// RACSignal+Buffer.m
// RXTesting
//
// Created by Atticus White on 2/1/17.
// Copyright © 2017 Atticus White. All rights reserved.
//
#import "RACSignal+Buffer.h"
#import <ReactiveCocoa/ReactiveCocoa.h>
@implementation RACSignal (Buffer)
- (RACSignal *)enumerableBufferBefore:(RACSignal *)startSignal
{
NSCParameterAssert(startSignal != nil);
return [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
__block BOOL signalAcquired = false;
NSLock *sendLock = [[NSLock alloc] init];
NSMutableArray *buffer = [NSMutableArray array];
void (^sendNext)() = ^(id value){
// Subscriber has two trigger sources, lock to avoid out of sequence events
[sendLock lock];
[subscriber sendNext:value];
[sendLock unlock];
};
// Flush the buffer
void (^flushBuffer)() = ^{
// If this is the first time flushing the buffer, enumerate all buffered items and send to subscriber
if (!signalAcquired) {
[buffer enumerateObjectsUsingBlock:^(id _Nonnull obj, NSUInteger idx, BOOL * _Nonnull stop) {
sendNext(obj);
}];
[buffer removeAllObjects];
}
};
// Subscribe to the beforeSignal
RACDisposable *untilDisposable = [startSignal subscribeNext:^(id x) {
// When we receive the first event, flush the buffer to the subscriber and stop listening.
flushBuffer();
signalAcquired = true;
// stop listening
[untilDisposable dispose];
}];
RACDisposable *selfDisposable = [self subscribeNext:^(id x) {
// When the current action receives a value..
@synchronized (buffer) {
if (signalAcquired) {
// If the buffer signal has been acquired, send the value to the subscriber
sendNext(x ?: [RACTupleNil tupleNil]);
} else {
// If the buffer signal has not been acquired, build the buffer
[buffer addObject:x ?: [RACTupleNil tupleNil]];
}
}
} error:^(NSError *error) {
// Forward errors to subscriber
[subscriber sendError:error];
} completed:^{
flushBuffer();
// Forward completions to subscriber
[subscriber sendCompleted];
}];
// Unsubscribe
return [RACDisposable disposableWithBlock:^{
[selfDisposable dispose];
[untilDisposable dispose];
}];
}];
}
@end
@atticoos
Copy link
Author

atticoos commented Feb 2, 2017

screen shot 2017-02-01 at 10 26 29 pm

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment