Last active
February 2, 2017 03:54
-
-
Save atticoos/aa5b24aed7ac7193f1760eb98da31df5 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// | |
// AppDelegate.m | |
// RXTesting | |
// | |
// Created by Atticus White on 2/1/17. | |
// Copyright © 2017 Atticus White. All rights reserved. | |
// | |
#import "AppDelegate.h" | |
#import <ReactiveCocoa/ReactiveCocoa.h> | |
#import "RACSignal+Buffer.h" | |
@interface AppDelegate () | |
@property (nonatomic) BOOL started; | |
@end | |
/** | |
* Objective: Build a buffer from a signal (source-signal) until another signal (start-signal) has arrived. | |
* Until the start-signal arrives, the source-signal actions should queue in a buffer. | |
* Once the start-signal arrives, they should be sent one-by-one to the subscriber. | |
* Then all actions from the source-signal, after the start-signal has been acquired, should stream as normal. | |
* | |
* Example: | |
* | |
* SourceSignal StartSignal Buffer Subscriber | |
* 'A' - ['A'] - | |
* 'B' - ['A', 'B'] - | |
* - 1 [] 'A' then 'B' | |
* 'C' - [] 'C' | |
* 'D' - [] 'D' | |
* | |
*/ | |
@implementation AppDelegate | |
- (BOOL)application:(UIApplication *)application didFinishLaunchingWithOptions:(NSDictionary *)launchOptions { | |
self.started = false; | |
NSNotificationCenter *notificationCenter = [NSNotificationCenter defaultCenter]; | |
RACSignal *hasStarted = [RACObserve(self, started) filter:^BOOL(id value) { | |
return [value boolValue]; | |
}]; | |
RACSignal *actions = [RACSignal merge:@[ | |
[notificationCenter rac_addObserverForName:@"FOO" object:nil], | |
[notificationCenter rac_addObserverForName:@"BAR" object:nil], | |
[notificationCenter rac_addObserverForName:@"HELLO" object:nil], | |
[notificationCenter rac_addObserverForName:@"WORLD" object:nil] | |
]]; | |
[[actions enumerableBufferBefore:hasStarted] subscribeNext:^(NSNotification *notification) { | |
NSLog(@"Received Action : %@", notification.name); | |
}]; | |
NSLog(@"Dispatching 'FOO'"); | |
[notificationCenter postNotificationName:@"FOO" object:nil]; | |
NSLog(@"Dispatching 'BAR'"); | |
[notificationCenter postNotificationName:@"BAR" object:nil]; | |
[self start]; | |
return YES; | |
} | |
- (void)start | |
{ | |
NSLog(@"Waiting for ReactNative to mount.."); | |
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(5 * NSEC_PER_SEC)), dispatch_get_main_queue(), ^{ | |
NSLog(@"ReactNative mounted"); | |
self.started = true; | |
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(1 * NSEC_PER_SEC)), dispatch_get_main_queue(), ^{ | |
NSLog(@"Dispatching 'HELLO'"); | |
[[NSNotificationCenter defaultCenter] postNotificationName:@"HELLO" object:nil]; | |
NSLog(@"Dispatching 'WORLD'"); | |
[[NSNotificationCenter defaultCenter] postNotificationName:@"WORLD" object:nil]; | |
}); | |
}); | |
} | |
@end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// | |
// RACSignal+Buffer.m | |
// RXTesting | |
// | |
// Created by Atticus White on 2/1/17. | |
// Copyright © 2017 Atticus White. All rights reserved. | |
// | |
#import "RACSignal+Buffer.h" | |
#import <ReactiveCocoa/ReactiveCocoa.h> | |
@implementation RACSignal (Buffer) | |
- (RACSignal *)enumerableBufferBefore:(RACSignal *)startSignal | |
{ | |
NSCParameterAssert(startSignal != nil); | |
return [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) { | |
__block BOOL signalAcquired = false; | |
NSLock *sendLock = [[NSLock alloc] init]; | |
NSMutableArray *buffer = [NSMutableArray array]; | |
void (^sendNext)() = ^(id value){ | |
// Subscriber has two trigger sources, lock to avoid out of sequence events | |
[sendLock lock]; | |
[subscriber sendNext:value]; | |
[sendLock unlock]; | |
}; | |
// Flush the buffer | |
void (^flushBuffer)() = ^{ | |
// If this is the first time flushing the buffer, enumerate all buffered items and send to subscriber | |
if (!signalAcquired) { | |
[buffer enumerateObjectsUsingBlock:^(id _Nonnull obj, NSUInteger idx, BOOL * _Nonnull stop) { | |
sendNext(obj); | |
}]; | |
[buffer removeAllObjects]; | |
} | |
}; | |
// Subscribe to the beforeSignal | |
RACDisposable *untilDisposable = [startSignal subscribeNext:^(id x) { | |
// When we receive the first event, flush the buffer to the subscriber and stop listening. | |
flushBuffer(); | |
signalAcquired = true; | |
// stop listening | |
[untilDisposable dispose]; | |
}]; | |
RACDisposable *selfDisposable = [self subscribeNext:^(id x) { | |
// When the current action receives a value.. | |
@synchronized (buffer) { | |
if (signalAcquired) { | |
// If the buffer signal has been acquired, send the value to the subscriber | |
sendNext(x ?: [RACTupleNil tupleNil]); | |
} else { | |
// If the buffer signal has not been acquired, build the buffer | |
[buffer addObject:x ?: [RACTupleNil tupleNil]]; | |
} | |
} | |
} error:^(NSError *error) { | |
// Forward errors to subscriber | |
[subscriber sendError:error]; | |
} completed:^{ | |
flushBuffer(); | |
// Forward completions to subscriber | |
[subscriber sendCompleted]; | |
}]; | |
// Unsubscribe | |
return [RACDisposable disposableWithBlock:^{ | |
[selfDisposable dispose]; | |
[untilDisposable dispose]; | |
}]; | |
}]; | |
} | |
@end |
Author
atticoos
commented
Feb 2, 2017
•
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment