Skip to content

Instantly share code, notes, and snippets.

@jspahrsummers
Last active December 22, 2016 16:15
Show Gist options
  • Save jspahrsummers/9756674 to your computer and use it in GitHub Desktop.
Save jspahrsummers/9756674 to your computer and use it in GitHub Desktop.
Non-blocking mutual exclusion with readers and writers, for asynchronous operations that may span more than one GCD block (making a single concurrent GCD queue unsuitable).
//
// GHReadWriteQueue.h
// GitHub
//
// Created by Justin Spahr-Summers on 2014-03-24.
// Copyright (c) 2014 GitHub. All rights reserved.
//
#import <Foundation/Foundation.h>
/// A read-write lock that synchronizes work at the level of signals instead of
/// blocks, and without blocking any queues.
@interface GHReadWriteQueue : NSObject
/// Initializes the queue with a debugging name.
///
/// This is the designated initializer for this class.
///
/// name - The debug name for this queue. Must not be nil.
- (instancetype)initWithName:(NSString *)name;
/// Enqueues an operation that can run concurrently with other operations.
///
/// workSignal - A signal to subscribe to once a concurrent lock is obtained. This
/// signal should avoid performing synchronous work, as it will occupy
/// the queue until returning. Once the signal has sent `error` or
/// `completed`, the concurrent lock will be released. This argument
/// must not be nil.
///
/// Returns a signal that will enqueue `workSignal` once for each new
/// subscription, subscribe to it when a concurrent lock is obtained, then
/// forward all events.
- (RACSignal *)addConcurrentSignal:(RACSignal *)workSignal;
/// Enqueues an operation that must run serially with respect to all other
/// operations.
///
/// Exclusive operations have barrier semantics. Once an exclusive operation has
/// been enqueued, no operations enqueued thereafter will be allowed to begin
/// until it has finished executing.
///
/// workSignal - A signal to subscribe to once the exclusive lock is obtained. This
/// signal should avoid performing synchronous work, as it will occupy
/// the queue until returning. Once the signal has sent `error` or
/// `completed`, the exclusive lock will be released. This argument
/// must not be nil.
///
/// Returns a signal that will enqueue `workSignal` once for each new
/// subscription, subscribe to it when the exclusive lock is obtained, then
/// forward all events.
- (RACSignal *)addExclusiveSignal:(RACSignal *)workSignal;
@end
//
// GHReadWriteQueue.m
// GitHub
//
// Created by Justin Spahr-Summers on 2014-03-24.
// Copyright (c) 2014 GitHub. All rights reserved.
//
#import "GHReadWriteQueue.h"
@interface GHReadWriteQueue ()
// The name for this queue.
@property (nonatomic, copy, readonly) NSString *name;
// A concurrent GCD queue upon which new operations are scheduled.
//
// In other words, a barrier block on this queue (while running) will prevent
// new operations from being started.
@property (nonatomic, readonly) dispatch_queue_t operationSchedulingQueue;
// A group entered whenever a concurrent operation has been started.
//
// When this group is empty, there are no concurrent operations in-flight.
// However, the `operationSchedulingQueue` must be suspended or blocked to
// guarantee that condition remains true.
@property (nonatomic, readonly) dispatch_group_t concurrentOperationsGroup;
@end
@implementation GHReadWriteQueue
#pragma mark Lifecycle
- (instancetype)initWithName:(NSString *)name {
NSParameterAssert(name != nil);
self = [super init];
if (self == nil) return nil;
_name = [name copy];
_concurrentOperationsGroup = dispatch_group_create();
NSString *queueLabel = [self.name stringByAppendingString:@".operationSchedulingQueue"];
_operationSchedulingQueue = dispatch_queue_create(queueLabel.UTF8String, DISPATCH_QUEUE_CONCURRENT);
// Starting new operations is low priority compared to finishing up existing
// work.
dispatch_set_target_queue(_operationSchedulingQueue, dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_LOW, 0));
return self;
}
- (void)dealloc {
if (_concurrentOperationsGroup != NULL) {
dispatch_release(_concurrentOperationsGroup);
_concurrentOperationsGroup = NULL;
}
if (_operationSchedulingQueue != NULL) {
dispatch_release(_operationSchedulingQueue);
_operationSchedulingQueue = NULL;
}
}
#pragma mark Queuing
- (RACSignal *)addConcurrentSignal:(RACSignal *)workSignal {
NSParameterAssert(workSignal != nil);
return [[RACSignal
createSignal:^(id<RACSubscriber> subscriber) {
RACDisposable *disposable = [[RACDisposable alloc] init];
// Attempt to schedule a new concurrent operation. This queue
// will be suspended while an exclusive operation is running, so
// the block may not run immediately.
dispatch_async(self.operationSchedulingQueue, ^{
if (disposable.disposed) return;
// Indicate that our operation is executing.
dispatch_group_enter(self.concurrentOperationsGroup);
// When the operation finishes, indicate that it's no longer
// executing.
//
// Order is important here! This should happen _after_ notifying
// the subscriber about termination events, in case it does any
// work that uses the repository.
void (^finished)(void) = ^{
dispatch_group_leave(self.concurrentOperationsGroup);
};
[workSignal subscribeNext:^(id x) {
[subscriber sendNext:x];
} error:^(NSError *error) {
[subscriber sendError:error];
finished();
} completed:^{
[subscriber sendCompleted];
finished();
}];
});
return disposable;
}]
setNameWithFormat:@"%@ %s", self, sel_getName(_cmd)];
}
- (RACSignal *)addExclusiveSignal:(RACSignal *)workSignal {
NSParameterAssert(workSignal != nil);
return [[RACSignal
createSignal:^(id<RACSubscriber> subscriber) {
RACDisposable *disposable = [[RACDisposable alloc] init];
// Wait for all outstanding operations to finish being scheduled.
dispatch_barrier_async(self.operationSchedulingQueue, ^{
if (disposable.disposed) return;
// Then, disable any further scheduling.
dispatch_suspend(self.operationSchedulingQueue);
// When the operation finishes, resume scheduling.
//
// Order is important here! This should happen _after_ notifying
// the subscriber about termination events, in case it does any
// work that uses the repository.
void (^finished)(void) = ^{
dispatch_resume(self.operationSchedulingQueue);
};
// Once all concurrent operations finish, start a new exclusive
// operation.
//
// We know there are no exclusive operations in flight, because
// the `operationSchedulingQueue` would have been suspended (and
// this code therefore wouldn't be running) if that were the
// case.
dispatch_group_notify(self.concurrentOperationsGroup, dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_HIGH, 0), ^{
if (disposable.disposed) {
finished();
return;
}
[workSignal subscribeNext:^(id x) {
[subscriber sendNext:x];
} error:^(NSError *error) {
[subscriber sendError:error];
finished();
} completed:^{
[subscriber sendCompleted];
finished();
}];
});
});
return disposable;
}]
setNameWithFormat:@"%@ %s", self, sel_getName(_cmd)];
}
#pragma mark NSObject
- (NSString *)description {
return [NSString stringWithFormat:@"<%@: %p>{ name = %@ }", self.class, self, self.name];
}
@end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment