Last active
October 28, 2020 08:00
-
-
Save luqmana/a40c879e6bdcfe3ca4b67948302dd337 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
pub struct Parker { | |
state: AtomicI8, | |
event_handle: AtomicU32, // The bottom 2 bits of HANDLE are always 0 if you really wanted to get fancy and stuff state there | |
// https://devblogs.microsoft.com/oldnewthing/20050121-00/?p=36633 | |
} | |
impl Parker { | |
pub fn new() -> Self { | |
Self { | |
state: AtomicI8::new(EMPTY), | |
event_handle: AtomicU32::new(c::INVALID_HANDLE_VALUE as u32), | |
} | |
} | |
// Assumes this is only called by the thread that owns the Parker, | |
// which means that `self.state != PARKED`. | |
pub unsafe fn park(&self) { | |
// Change NOTIFIED=>EMPTY or EMPTY=>PARKED, and directly return in the | |
// first case. | |
if self.state.fetch_sub(1, Acquire) == NOTIFIED { | |
return; | |
} | |
if c::WaitOnAddress::is_available() { | |
loop { | |
// Wait for something to happen, assuming it's still set to PARKED. | |
c::WaitOnAddress(self.ptr(), &PARKED as *const _ as c::LPVOID, 1, c::INFINITE); | |
// Change NOTIFIED=>EMPTY but leave PARKED alone. | |
if self.state.compare_and_swap(NOTIFIED, EMPTY, Acquire) == NOTIFIED { | |
// Actually woken up by unpark(). | |
return; | |
} else { | |
// Spurious wake up. We loop to try again. | |
} | |
} | |
} else { | |
// Wait for unpark() to produce this event. | |
c::WaitForSingleObject(self.event_handle(), c::INFINITE); | |
// Set the state back to EMPTY (from either PARKED or NOTIFIED). | |
// Note that we don't just write EMPTY, but use swap() to also | |
// include an acquire-ordered read to synchronize with unpark()'s | |
// release-ordered write. | |
self.state.swap(EMPTY, Acquire); | |
} | |
} | |
// Assumes this is only called by the thread that owns the Parker, | |
// which means that `self.state != PARKED`. | |
pub unsafe fn park_timeout(&self, timeout: Duration) { | |
// Change NOTIFIED=>EMPTY or EMPTY=>PARKED, and directly return in the | |
// first case. | |
if self.state.fetch_sub(1, Acquire) == NOTIFIED { | |
return; | |
} | |
if c::WaitOnAddress::is_available() { | |
// Wait for something to happen, assuming it's still set to PARKED. | |
c::WaitOnAddress( | |
self.ptr(), | |
&PARKED as *const _ as c::LPVOID, | |
1, | |
dur2timeout(timeout), | |
); | |
// Set the state back to EMPTY (from either PARKED or NOTIFIED). | |
// Note that we don't just write EMPTY, but use swap() to also | |
// include an acquire-ordered read to synchronize with unpark()'s | |
// release-ordered write. | |
if self.state.swap(EMPTY, Acquire) == NOTIFIED { | |
// Actually woken up by unpark(). | |
} else { | |
// Timeout or spurious wake up. | |
// We return either way, because we can't easily tell if it was the | |
// timeout or not. | |
} | |
} else { | |
// Wait for unpark() to produce this event. | |
let unparked = c::WaitForSingleObject(self.event_handle(), dur2timeout(timeout)) | |
== c::WAIT_OBJECT_0; | |
// Set the state back to EMPTY (from either PARKED or NOTIFIED). | |
let prev_state = self.state.swap(EMPTY, Acquire); | |
if !unparked && prev_state == NOTIFIED { | |
// Timed out (or error) | |
} | |
} | |
} | |
pub fn unpark(&self) { | |
// Change PARKED=>NOTIFIED, EMPTY=>NOTIFIED, or NOTIFIED=>NOTIFIED, and | |
// wake the thread in the first case. | |
// | |
// Note that even NOTIFIED=>NOTIFIED results in a write. This is on | |
// purpose, to make sure every unpark() has a release-acquire ordering | |
// with park(). | |
if self.state.swap(NOTIFIED, Release) == PARKED { | |
if c::WakeByAddressSingle::is_available() { | |
unsafe { | |
c::WakeByAddressSingle(self.ptr()); | |
} | |
} else { | |
unsafe { | |
c::SetEvent(self.event_handle()); | |
} | |
} | |
} | |
} | |
fn ptr(&self) -> c::LPVOID { | |
&self.state as *const _ as c::LPVOID | |
} | |
fn event_handle(&self) -> c::HANDLE { | |
const INVALID: u32 = !0; // c::INVALID_HANDLE_VALUE | |
match self.event_handle.load(Relaxed) { | |
INVALID => { | |
let handle = unsafe { | |
c::CreateEventW( | |
ptr::null_mut(), | |
0, // Auto-reset event object | |
0, // Initially nonsignaled | |
ptr::null(), | |
) | |
}; | |
if handle.is_null() { | |
panic!("Unable to create event handle: error {}", unsafe { | |
c::GetLastError() | |
}); | |
} | |
match self | |
.event_handle | |
.compare_exchange(INVALID, handle as u32, Relaxed, Relaxed) | |
{ | |
Ok(_) => handle, | |
Err(h) => { | |
// Lost the race to another thread initializing HANDLE before we did. | |
// Closing our handle and using theirs instead. | |
unsafe { | |
c::CloseHandle(handle); | |
} | |
h as c::HANDLE | |
} | |
} | |
} | |
handle => handle as c::HANDLE, | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment