Skip to content

Instantly share code, notes, and snippets.

@jspahrsummers
Last active December 29, 2015 03:59
Show Gist options
  • Save jspahrsummers/7612025 to your computer and use it in GitHub Desktop.
Save jspahrsummers/7612025 to your computer and use it in GitHub Desktop.
Using -flatten:withPolicy: (from ReactiveCocoa 3.0) and related operators to deal with producers that can outstrip consumers
// Asynchronously loads UIImages from the given URLs, but throttles the loading
// to keep memory and CPU usage sane.
- (RACSignal *)loadImagesAtURLs:(NSArray *)imageURLs {
// Map each URL to a signal of work. The result is a signal of work signals.
return [[imageURLs.rac_signal
map:^(NSURL *imageURL) {
return [[[[NSURLConnection
// Load the URL asynchronously.
rac_sendAsynchronousRequest:[NSURLRequest requestWithURL:imageURL]]
reduceEach:^(NSURLResponse *response, NSData *data) {
return data;
}]
tryMap:^(NSData *data, NSError **error) {
return [UIImage imageWithData:data];
}]
flattenMap:^(UIImage *original) {
return [[self
// A hypothetical method returning a `RACSignal` that
// will scale down `original`.
resizeImage:original]
// Perform the resizing in the background.
subscribeOn:[RACScheduler scheduler]];
}];
}]
// Flatten the signal of work signals, resulting in _one_ signal of work.
//
// The parameters here ensure that we process (load and resize) no more
// than 3 images at a time, and enqueue the rest.
flatten:3 withPolicy:RACSignalFlattenPolicyQueue];
}
// Maps each URL to a signal for asynchronously reading that URL.
- (RACSignal *)fileReadingSignalsForURLs:(NSArray *)fileURLs {
// Map each file URL to a "cold" signal that will read from the file.
return [fileURLs.rac_signal map:^(NSURL *fileURL) {
return [[RACSignal
// Create a signal that reads from `fileURL` once subscribed to (but not
// before).
create:^(id<RACSubscriber> subscriber) {
NSError *error = nil;
NSData *data = [NSData dataWithContentsOfURL:fileURL options:0 error:&error];
if (data == nil) {
[subscriber sendError:error];
} else {
[subscriber sendNext:data];
[subscriber sendCompleted];
}
}];
// Perform the reading in the background.
subscribeOn:[RACScheduler scheduler]];
}];
}
/*
* Returning a signal of signals (above) allows the caller to determine the
* throttling strategy:
*/
RACSignal *readSignals = [self fileReadingSignalsForURLs:URLs];
// To start all reads as quickly as possible (with no limit on concurrency):
[[readSignals flatten] subscribeNext:^(NSData *data) {
// …
}];
// To read files one-by-one:
[[readSignals concat] subscribeNext:^(NSData *data) {
// …
}];
// To read up to 3 files at a time:
[[readSignals flatten:3 withPolicy:RACSignalFlattenPolicyQueue] subscribeNext:^(NSData *data) {
// …
}];
// Pushes data snapshots to a web service as they arrive, cancelling and
// discarding older snapshots if we fall behind.
- (RACSignal *)syncSnapshots:(RACSignal *)snapshots {
// Map each snapshot to a network request.
return [[snapshots
map:^(NSData *snapshot) {
NSMutableURLRequest *request = [NSMutableURLRequest requestWithURL:saveSnapshotURL];
request.HTTPMethod = @"POST";
request.HTTPBody = snapshot;
// Send the POST asynchronously.
reutrn [NSURLConnection rac_sendAsynchronousRequest:request];
}]
// Limit to 3 connections at a time. If a new snapshot arrives while
// we're already saving 3, the oldest will be cancelled and discarded.
flatten:3 withPolicy:RACSignalFlattenPolicyDisposeEarliest];
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment