Skip to content

Instantly share code, notes, and snippets.

@thislight
Last active December 17, 2021 15:02
Show Gist options
  • Save thislight/e9c7335932a7139910c441d2770c33c5 to your computer and use it in GitHub Desktop.
Save thislight/e9c7335932a7139910c441d2770c33c5 to your computer and use it in GitHub Desktop.
WebKit's ParkingLot in Zig. https://webkit.org/blog/6161/locking-in-webkit/ This implementation can automatically collect memory from old hash map instead of leaking.
//! Fast and small locks.
//! This implementation based on WebKit's https://webkit.org/blog/6161/locking-in-webkit/
const std = @import("std");
const futex = std.Thread.Futex;
const Allocator = std.mem.Allocator;
const Atomic = std.atomic.Atomic;
const assert = std.debug.assert;
const ThreadData = struct {
shouldPark: bool = false,
futexMark: Atomic(u32) = Atomic(u32).init(0),
next: ?*ThreadData = null,
tail: ?*ThreadData = null,
firstTimeParking: bool = true,
};
threadlocal var perThreadData = ThreadData {};
var gParkingLot: ?ParkingLot = null;
pub const ParkingLot = struct {
alloc: *Allocator,
lot: *Lot,
const Self = @This();
const hashCtx = std.array_hash_map.AutoContext(usize) {};
fn init(alloc: *Allocator) Allocator.Error!Self {
var lot = try Lot.init(alloc, 1);
errdefer lot.deinit();
return Self {
.alloc = alloc,
.lot = lot,
};
}
/// Call this function when program just started.
pub fn initGlobal(alloc: *Allocator) Allocator.Error!void {
if (gParkingLot == null) {
gParkingLot = try Self.init(alloc);
}
}
fn deinit(self: *Self) void {
self.lot.freeAll();
self.lot.deinit();
}
/// Call this function when program is going to exit.
pub fn deinitGlobal() void {
if (gParkingLot) |*g| {
g.deinit();
gParkingLot = null;
}
}
const Lot = struct {
refcnt: Atomic(usize),
buckets: []*Bucket,
alloc: *Allocator,
old: ?*Lot,
pub fn init(alloc: *Allocator, bucketN: usize) Allocator.Error!*Lot {
return expandFrom(alloc, bucketN, null);
}
fn expandFrom(alloc: *Allocator, bucketN: usize, old: ?*Lot) Allocator.Error!*Lot {
var buckets = try expandBucketN(alloc, bucketN, if (old) |oldObj| oldObj.buckets else &.{});
errdefer destroyBuckets(alloc, buckets);
var object = try alloc.create(Lot);
errdefer alloc.destroy(object);
object.* = Lot {
.refcnt = Atomic(usize).init(if (old) |oldObj| oldObj.refcnt.value else 0),
.buckets = buckets,
.alloc = alloc,
.old = old,
};
return object;
}
fn expandBucketN(alloc: *Allocator, bucketN: usize, old: []*Bucket) Allocator.Error![]*Bucket {
assert(bucketN >= old.len);
var newSli = try alloc.alloc(*Bucket, bucketN);
errdefer alloc.free(newSli);
std.mem.copy(*Bucket, newSli, old);
for (newSli[old.len..bucketN]) |*ptr, i| {
errdefer {
for (newSli[old.len..old.len+i]) |obj| {
alloc.destroy(obj);
}
}
ptr.* = try alloc.create(Bucket);
ptr.*.* = Bucket {
.head = null,
.tail = null,
};
}
return newSli;
}
fn destroyBuckets(alloc: *Allocator, buckets: []*Bucket) void {
for (buckets) |bptr| {
alloc.destroy(bptr);
}
alloc.free(buckets);
}
pub fn swap(self: *Lot, lockAddr: usize, beSwapped: *ThreadData, to: *ThreadData, validation: anytype, beforeSleep: anytype, timeout: ?u64) bool {
{
var bucket = self.getBucket(lockAddr, self.buckets.len);
bucket.safelock();
defer bucket.safeunlock();
if (!std.meta.trait.is(.Null)(validation)) {
if (!invokeClosure(bool, validation)) {
return false;
}
}
var node = nodeFind: {
var current = bucket.head;
while (current) |node| : (current = node.next) {
if (node.lockAddress == lockAddr and node.thread == beSwapped) {
break :nodeFind node;
}
} else {
break :nodeFind null;
}
};
if (node) |n| {
n.thread = to;
beSwapped.shouldPark = false;
futex.wake(&beSwapped.futexMark, 1);
} else {
return;
}
}
if (@typeInfo(@TypeOf(beforeSleep)) != .Null) {
invokeClosure(void, beforeSleep);
}
to.shouldPark = true;
while (to.shouldPark) {
futex.wait(&to.futexMark, 0, timeout) catch return false;
}
return true;
}
/// Enqueue thread.
/// That will wait of wake from `thread.futexMark`.
pub fn enqueue(self: *Lot, lockAddr: usize, thread: *ThreadData, validation: anytype, beforeSleep: anytype, timeout: ?u64) Allocator.Error!bool {
{
var bucket = self.getBucket(lockAddr, self.buckets.len);
bucket.safelock();
defer bucket.safeunlock();
if (@typeInfo(@TypeOf(validation)) != .Null) {
if (!invokeClosure(bool, validation)) {
return false;
}
}
_ = self.refcnt.fetchAdd(1, .SeqCst);
var node = try self.alloc.create(WakeRequest);
errdefer self.alloc.destroy(node);
node.* = WakeRequest {
.lockAddress = lockAddr,
.thread = thread,
.next = null,
.prev = null,
};
if (bucket.tail) |tail| {
tail.next = node;
node.prev = tail;
} else {
bucket.head = node;
}
bucket.tail = node;
assert(bucket.head != null);
}
if (@typeInfo(@TypeOf(beforeSleep)) != .Null) {
invokeClosure(void, beforeSleep);
}
thread.shouldPark = true;
while (thread.shouldPark) {
futex.wait(&thread.futexMark, 0, timeout) catch return false;
}
return true;
}
fn getBucket(self: *Lot, lockAddr: usize, bucketN: usize) *Bucket {
return self.getBucketByHash(getHashOf(lockAddr), bucketN);
}
fn getHashOf(lockAddr: usize) u32 {
return hashCtx.hash(lockAddr);
}
fn getBucketByHash(self: *Lot, hash: u32, bucketN: usize) *Bucket {
const bukpos = hash % bucketN;
return self.buckets[bukpos];
}
fn findNodeOf(node: ?*WakeRequest, lockAddr: usize) ?*WakeRequest {
var curr = node;
while (curr) |nncurr| : (curr = nncurr.next) {
if (nncurr.lockAddress == lockAddr) {
break;
}
}
return curr;
}
fn hasNode(self: *Lot, lockAddr: usize, node: *WakeRequest) bool {
var curr = self.getBucket(lockAddr, self.buckets.len).head;
while (curr != null and curr != node) : (curr = curr.?.next) {}
return curr != null;
}
fn dequeueFromBucket(bucket: *Bucket, lockAddr: usize) ?*WakeRequest {
if (bucket.head) |bhead| {
if (Lot.findNodeOf(bhead, lockAddr)) |node| {
if (node.prev) |prev| {
prev.next = node.next;
}
if (node.next) |next| {
next.prev = node.prev;
}
if (bucket.head == node) {
bucket.head = node.next;
}
if (bucket.tail == node) {
bucket.tail = node.prev;
}
return node;
} else return null;
} else return null;
}
fn deref(self: *Lot, lockAddr: usize, node: *WakeRequest) void {
if (self.hasNode(lockAddr, node)) {
_ = self.refcnt.fetchSub(1, .SeqCst);
}
if (self.old) |old| {
old.deref(lockAddr, node);
if (old.refcnt.load(.SeqCst) == 0) {
if (@cmpxchgStrong(?*Lot, &self.old, old, null, .SeqCst, .SeqCst) != null) {
old.deinit();
self.old = null;
}
}
}
}
pub fn dequeue(self: *Lot, lockAddr: usize, callback: anytype) ?*ThreadData {
const hash = getHashOf(lockAddr);
{ // Lock all corresponding locks.
var current = self;
while (true) {
var bucket = self.getBucketByHash(hash, current.buckets.len);
bucket.safelock();
current = current.old orelse break;
}
}
defer { // Unlock all locked buckets
var current = self;
while (true) {
var bucket = self.getBucketByHash(hash, current.buckets.len);
bucket.safeunlock();
current = current.old orelse break;
}
}
{
var current = self;
var foundNode: ?*WakeRequest = nodeFind: {
while (true) {
var bucket = self.getBucketByHash(hash, current.buckets.len);
if (dequeueFromBucket(bucket, lockAddr)) |node| {
node.thread.shouldPark = false;
futex.wake(&node.thread.futexMark, 1);
if (@typeInfo(@TypeOf(callback)) != .Null) {
invokeClosure1(void, callback, UnparkResult {
.didUnparkThread = true,
.mayHaveMoreThread = findNodeOf(node.next, lockAddr) != null,
});
}
break :nodeFind node;
}
current = current.old orelse break :nodeFind null;
}
};
if (foundNode) |node| {
defer self.alloc.destroy(node);
self.deref(lockAddr, node);
return node.thread;
} else {
if (@typeInfo(@TypeOf(callback)) != .Null) {
invokeClosure1(void, callback, UnparkResult {
.didUnparkThread = false,
.mayHaveMoreThread = false,
});
}
return null;
}
}
}
/// Deinitialise structure.
/// This function does not free the memory used by nodes.
/// See freeAll to free the memory used by nodes.
fn deinit(self: *Lot) void {
const bucketDestoryStart = if (self.old) |oldLot| oldLot.buckets.len else 0;
if (self.old) |oldLot| {
oldLot.deinit();
}
for (self.buckets[bucketDestoryStart..self.buckets.len]) |buk| {
self.alloc.destroy(buk);
// avoid double free: oldLot.deinit() will do the same as we do here for the latest lot.
}
self.alloc.free(self.buckets);
self.alloc.destroy(self);
}
/// Free all memory used by this strutcure. You don't need call this on old lots,
/// Since the old buckets will be reused in new lots.
/// Warning: This function will lock down all locks of bucket and won't release.
/// That helps to identify if someone does not leave queue when program exits.
fn freeAll(self: *Lot) void {
for (self.buckets) |buk| {
buk.safelock();
}
for (self.buckets) |buk| {
var curr = buk.head;
while (curr) |node| {
var next = node.next;
self.alloc.destroy(node);
curr = next;
}
}
}
/// Return a resized lot. You must use this new lot instantly.
/// After the new lot is set, use completeResizing to unlock buckets.
fn resize(self: *Lot, newBucketN: usize) Allocator.Error!*Lot {
assert(newBucketN > self.buckets.len); // TODO: implement resize buckets to small size
for (self.buckets) |buk| {
buk.safelock();
}
errdefer {
for (self.buckets) |buk| {
buk.safeunlock();
}
}
var newLot = try Lot.expandFrom(self.alloc, newBucketN, self);
errdefer newLot.deinit();
return newLot;
}
fn completeResizing(self: *Lot) void {
for (self.buckets) |buk| {
buk.safeunlock();
}
}
};
const Bucket = struct {
head: ?*WakeRequest,
tail: ?*WakeRequest,
lock: WordLock = WordLock {},
usingBy: ?*const ThreadData = null,
fn safelock(self: *Bucket) void {
if (self.usingBy != &perThreadData) {
self.lock.lock();
self.usingBy = &perThreadData;
}
}
fn safeunlock(self: *Bucket) void {
if (self.usingBy == &perThreadData) {
self.usingBy = null;
self.lock.unlock();
} else if (self.usingBy != null) {
unreachable; // It should not happen
}
}
};
const WakeRequest = struct {
lockAddress: usize,
thread: *ThreadData,
next: ?*WakeRequest = null,
prev: ?*WakeRequest = null,
};
fn prepareThread(self: *Self) void {
if (perThreadData.firstTimeParking) {
var refcnt = self.lot.refcnt.load(.SeqCst);
if (refcnt > @divTrunc(self.lot.buckets.len, 3)) {
var currentLot = @atomicLoad(*Lot, &self.lot, .SeqCst);
var newLot = currentLot.resize(currentLot.buckets.len * 2) catch return;
while (@cmpxchgWeak(*Lot, &self.lot, currentLot, newLot, .SeqCst, .SeqCst)) |_| {}
newLot.completeResizing();
}
perThreadData.firstTimeParking = false;
}
}
fn invokeClosure(comptime R: type, closure: anytype) R {
const T = @TypeOf(closure);
const info = @typeInfo(T);
if (info == .Fn) {
return closure();
} else if (info == .Struct){
return closure.call();
} else if (info == .Optional) {
if (closure) |f| {
return invokeClosure(R, f);
} else unreachable;
}
else {
@compileError("closure expect a function or a structure with call method");
}
}
fn invokeClosure1(comptime R: type, closure: anytype, arg1: anytype) R {
const T = @TypeOf(closure);
const info = @typeInfo(T);
if (info == .Fn) {
return closure(arg1);
} else if (info == .Struct){
return closure.call(arg1);
} else if (info == .Optional) {
if (closure) |f| {
return invokeClosure1(R, f, arg1);
} else unreachable;
} else {
@compileError("closure expect a function or a structure with call method");
}
}
fn isNull(value: anytype) bool {
comptime {
const T = @TypeOf(value);
const info = @typeInfo(T);
if (info == .Null) {
return true;
} else {
return false;
}
}
}
fn parkConditionally(self: *Self, lockAddr: usize, validation: anytype, beforeSleep: anytype, timeout: ?u64) Allocator.Error!bool {
return try self.lot.enqueue(lockAddr, &perThreadData, validation, beforeSleep, timeout);
}
pub const UnparkResult = struct {
didUnparkThread: bool,
mayHaveMoreThread: bool,
};
fn unparkOne(self: *Self, lockAddr: usize, callback: anytype) void {
_ = self.lot.dequeue(lockAddr, callback);
}
fn unpackAll(self: *Self, lockAddr: usize) void {
while (self.lot.dequeue(lockAddr, null)) |_| {}
}
fn swap(self: *Self, lockAddr: usize, to: usize, validation: anytype, beforeSleep: anytype, timeout: ?u64) bool {
return self.lot.swap(lockAddr, &perThreadData, @intToPtr(*ThreadData, to), validation, beforeSleep, timeout);
}
fn getCurrentThread() usize {
return @ptrToInt(&perThreadData);
}
};
pub const getCurrentThreadId = ParkingLot.getCurrentThread;
/// Independent lock which uses usize length.
pub const WordLock = struct {
word: Atomic(usize) = Atomic(usize).init(0),
const Self = @This();
const IS_LOCKED_MASK = @as(usize, 1);
const IS_QUEUE_LOCKED_MASK = @as(usize, 2);
const QUEUE_HEAD_MASK = @as(usize, 3);
pub fn lock(self: *Self) void {
if (@cmpxchgWeak(usize, &self.word.value, 0, IS_LOCKED_MASK, .SeqCst, .SeqCst)) |_| {
// quick path failed
self.lockSlow();
}
}
fn lockSlow(self: *Self) void {
var spinCount = @as(u8, 0);
const spinLimit = 40;
while (true) {
var currentWord = self.word.load(.SeqCst);
if ((currentWord & IS_LOCKED_MASK) == 0) {
assert((currentWord & IS_QUEUE_LOCKED_MASK) == 0);
if (@cmpxchgWeak(usize, &self.word.value, currentWord, currentWord | IS_LOCKED_MASK, .SeqCst, .SeqCst) == null) {
return;
}
}
if ((currentWord & ~QUEUE_HEAD_MASK) == 0 and spinCount < spinLimit) {
spinCount += 1;
std.os.sched_yield() catch {}; // just try
continue;
}
var me = &perThreadData;
assert(!me.shouldPark);
assert(me.next == null);
assert(me.tail == null);
currentWord = self.word.load(.SeqCst);
if ((currentWord & IS_QUEUE_LOCKED_MASK) != 0
or (currentWord & IS_LOCKED_MASK) == 0
or (@cmpxchgWeak(usize, &self.word.value, currentWord, currentWord | IS_QUEUE_LOCKED_MASK, .SeqCst, .SeqCst) != null)) {
std.os.sched_yield() catch {};
continue;
}
me.shouldPark = true;
var qhead = @intToPtr(?*ThreadData, currentWord & ~QUEUE_HEAD_MASK);
if (qhead) |head| {
head.tail.?.tail = me;
head.tail = me;
assert(self.word.value == currentWord);
self.word.store(currentWord & ~IS_QUEUE_LOCKED_MASK, .SeqCst);
} else {
me.tail = me;
var newWord = currentWord;
newWord |= @ptrToInt(me);
newWord &= ~IS_QUEUE_LOCKED_MASK;
self.word.store(newWord, .SeqCst);
}
{
while(me.shouldPark) {
futex.wait(&me.futexMark, 0, null) catch {};
}
break;
}
assert(!me.shouldPark);
assert(me.next == null);
assert(me.tail == null);
}
}
pub fn unlock(self: *Self) void {
if (@cmpxchgWeak(usize, &self.word.value, IS_LOCKED_MASK, 0, .SeqCst, .SeqCst)) |_| {
// quick path failed
self.unlockSlow();
}
}
fn unlockSlow(self: *Self) void {
while (true) {
var currentWord = self.word.load(.SeqCst);
assert (currentWord & IS_LOCKED_MASK != 0);
if (currentWord == IS_LOCKED_MASK) {
if (@cmpxchgWeak(usize, &self.word.value, IS_LOCKED_MASK, 0, .SeqCst, .SeqCst) == null) {
return;
}
std.os.sched_yield() catch {};
continue;
}
if (currentWord & IS_QUEUE_LOCKED_MASK != 0) {
std.os.sched_yield() catch {};
continue;
}
assert((currentWord & ~QUEUE_HEAD_MASK) != 0);
if (@cmpxchgWeak(usize, &self.word.value, currentWord, currentWord | IS_QUEUE_LOCKED_MASK, .SeqCst, .SeqCst) == null) {
break;
}
}
var currentWord = self.word.load(.SeqCst);
assert(currentWord & IS_LOCKED_MASK != 0);
assert(currentWord & IS_QUEUE_LOCKED_MASK != 0);
var qhead = @intToPtr(*ThreadData, currentWord & ~QUEUE_HEAD_MASK);
var nextPtr = @ptrToInt(qhead.next);
if (qhead.next) |nextHead| {
nextHead.tail = qhead.tail;
}
assert(currentWord & IS_LOCKED_MASK != 0);
assert(currentWord & IS_QUEUE_LOCKED_MASK != 0);
assert((currentWord & (~QUEUE_HEAD_MASK)) == @ptrToInt(qhead));
var newWord = currentWord;
newWord &= ~IS_LOCKED_MASK;
newWord &= ~IS_QUEUE_LOCKED_MASK;
newWord &= QUEUE_HEAD_MASK;
newWord |= nextPtr;
self.word.store(newWord, .SeqCst);
qhead.next = null;
qhead.tail = null;
qhead.shouldPark = false;
futex.wake(&qhead.futexMark, 1);
}
};
fn workLockThread(lock: *WordLock, n: *u32) !void {
lock.lock();
defer lock.unlock();
n.* = 0;
}
test "WordLock functional test" {
const t = std.testing;
var lock = WordLock {};
var shared: u32 = 1;
lock.lock();
var t0 = try std.Thread.spawn(.{}, workLockThread, .{&lock, &shared});
t0.detach();
try t.expectEqual(@as(u32, 1), shared);
lock.unlock();
while (shared == 1) {
std.os.sched_yield() catch {};
}
try t.expectEqual(@as(u32, 0), shared);
lock.lock();
lock.unlock();
}
fn getParkingLot() *ParkingLot {
if (gParkingLot) |*plot| {
plot.prepareThread();
return plot;
} else unreachable;
}
pub const BargingLock = struct {
word: u2 = 0,
const Self = @This();
const IS_LOCKED_MASK = 1;
const IS_PARKED_MASK = 2;
pub fn lock(self: *Self) void {
if (@cmpxchgWeak(u2, &self.word, 0, IS_LOCKED_MASK, .SeqCst, .SeqCst) == null) {
return;
}
{
var i = @as(u8, 40);
while (i > 0) : (i-=1) {
if (@atomicLoad(u2, &self.word, .SeqCst) & IS_PARKED_MASK != 0) {
break;
}
if (@cmpxchgWeak(u2, &self.word, 0, IS_LOCKED_MASK, .SeqCst, .SeqCst) == null) {
return;
}
std.os.sched_yield() catch {};
}
}
while (true) {
var word = @atomicLoad(u2, &self.word, .SeqCst);
if ((word & IS_LOCKED_MASK == 0) and (@cmpxchgWeak(u2, &self.word, word, word | IS_LOCKED_MASK, .SeqCst, .SeqCst) == null)) {
return;
}
_ = @cmpxchgWeak(u2, &self.word, word, word | IS_PARKED_MASK, .SeqCst, .SeqCst);
const validation = struct {
lock: *Self,
pub fn call(this: @This()) bool {
return @atomicLoad(u2, &this.lock.word ,.SeqCst) == (IS_LOCKED_MASK | IS_PARKED_MASK);
}
} {.lock = self};
while (true) {
_ = getParkingLot().parkConditionally(@ptrToInt(self), validation, null, null) catch {
std.os.sched_yield() catch {};
continue;
};
break;
}
}
}
pub fn unlock(self: *Self) void {
if (@cmpxchgWeak(u2, &self.word, IS_LOCKED_MASK, 0, .SeqCst, .SeqCst) == null) {
return;
}
const callback = struct {
lock: *Self,
pub fn call(this: @This(), result: ParkingLot.UnparkResult) void {
if (result.mayHaveMoreThread) {
@atomicStore(u2, &this.lock.word, IS_PARKED_MASK, .SeqCst);
} else {
@atomicStore(u2, &this.lock.word, 0, .SeqCst);
}
}
} {.lock = self};
getParkingLot().unparkOne(@ptrToInt(self), callback);
}
};
pub const Lock = BargingLock;
fn bargingLockThread(lock: *BargingLock, n: *u32) !void {
while (true) {
lock.lock();
defer lock.unlock();
if (n.* > 0) {
n.* -= 1;
} else break;
}
}
test "BargingLock functional test" {
const t = std.testing;
try ParkingLot.initGlobal(t.allocator);
defer ParkingLot.deinitGlobal();
var lock = BargingLock {};
var shared: u32 = 1;
lock.lock();
var t0 = try std.Thread.spawn(.{}, bargingLockThread, .{&lock, &shared});
t0.detach();
try t.expectEqual(@as(u32, 1), shared);
lock.unlock();
while (shared == 1) {
std.os.sched_yield() catch {};
}
lock.lock();
try t.expectEqual(@as(u32, 0), shared);
lock.unlock();
}
fn assertLockPtr(comptime T: type) void {
if (comptime !(std.meta.trait.isPtrTo(.Struct)(T) and std.meta.trait.hasFunctions(@typeInfo(T).Pointer.child, .{"lock", "unlock"}))) {
@compileError("expect a pointer to a structure with declaration of lock() and unlock()");
}
}
pub const Condition = struct {
hasWaiters: bool = false,
const Self = @This();
fn BeforeSleepCallback(comptime L: type) type {
return struct {
cond: *Self,
lock: L,
pub fn call(this: @This()) void {
this.lock.unlock();
}
};
}
pub fn wait(self: *Self, lock: anytype) void {
assertLockPtr(@TypeOf(lock));
const validation = struct {
cond: *Self,
pub fn call(this: @This()) bool {
@atomicStore(bool, &this.cond.hasWaiters, true, .SeqCst);
return true;
}
} {.cond = self};
const beforeSleep = BeforeSleepCallback(@TypeOf(lock)) {.cond = self, .lock = lock};
while (true) {
_ = getParkingLot().parkConditionally(
@ptrToInt(self),
validation,
beforeSleep,
null,
) catch {
std.os.sched_yield() catch {};
continue;
};
break;
}
lock.lock();
}
pub fn notifyOne(self: *Self) void {
if (@atomicLoad(bool, &self.hasWaiters, .SeqCst)) {
const callback = struct {
cond: *Self,
pub fn call(this: @This(), result: ParkingLot.UnparkResult) void {
@atomicStore(bool, &this.cond.hasWaiters, result.mayHaveMoreThread, .SeqCst);
}
} {.cond = self};
getParkingLot().unparkOne(@ptrToInt(self), callback);
}
}
pub fn notifyAll(self: *Self) void {
if (@atomicLoad(bool, &self.hasWaiters, .SeqCst)) {
@atomicStore(bool, &self.hasWaiters, false, .SeqCst);
getParkingLot().unpackAll(@ptrToInt(self));
}
}
};
fn conditionThread(cond: *Condition, lock: *Lock, n: *u32, val: u32) void {
lock.lock();
defer lock.unlock();
cond.wait(lock);
n.* = val;
cond.notifyOne();
}
test "Condition functional test" {
const t = std.testing;
try ParkingLot.initGlobal(t.allocator);
defer ParkingLot.deinitGlobal();
var lock = Lock {};
var cond = Condition {};
var shared: u32 = 1;
var t0 = try std.Thread.spawn(.{}, conditionThread, .{&cond, &lock, &shared, 0});
t0.detach();
try t.expectEqual(@as(u32, 1), shared);
while (shared == 1) {
cond.notifyOne(); // It's possible that the thread doesn't wait on condition in first call of notifyOne()
std.os.sched_yield() catch {};
}
try t.expectEqual(@as(u32, 0), shared);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment