Skip to content

Instantly share code, notes, and snippets.

@luqmana
Last active October 28, 2020 08:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save luqmana/a40c879e6bdcfe3ca4b67948302dd337 to your computer and use it in GitHub Desktop.
Save luqmana/a40c879e6bdcfe3ca4b67948302dd337 to your computer and use it in GitHub Desktop.
pub struct Parker {
state: AtomicI8,
event_handle: AtomicU32, // The bottom 2 bits of HANDLE are always 0 if you really wanted to get fancy and stuff state there
// https://devblogs.microsoft.com/oldnewthing/20050121-00/?p=36633
}
impl Parker {
pub fn new() -> Self {
Self {
state: AtomicI8::new(EMPTY),
event_handle: AtomicU32::new(c::INVALID_HANDLE_VALUE as u32),
}
}
// Assumes this is only called by the thread that owns the Parker,
// which means that `self.state != PARKED`.
pub unsafe fn park(&self) {
// Change NOTIFIED=>EMPTY or EMPTY=>PARKED, and directly return in the
// first case.
if self.state.fetch_sub(1, Acquire) == NOTIFIED {
return;
}
if c::WaitOnAddress::is_available() {
loop {
// Wait for something to happen, assuming it's still set to PARKED.
c::WaitOnAddress(self.ptr(), &PARKED as *const _ as c::LPVOID, 1, c::INFINITE);
// Change NOTIFIED=>EMPTY but leave PARKED alone.
if self.state.compare_and_swap(NOTIFIED, EMPTY, Acquire) == NOTIFIED {
// Actually woken up by unpark().
return;
} else {
// Spurious wake up. We loop to try again.
}
}
} else {
// Wait for unpark() to produce this event.
c::WaitForSingleObject(self.event_handle(), c::INFINITE);
// Set the state back to EMPTY (from either PARKED or NOTIFIED).
// Note that we don't just write EMPTY, but use swap() to also
// include an acquire-ordered read to synchronize with unpark()'s
// release-ordered write.
self.state.swap(EMPTY, Acquire);
}
}
// Assumes this is only called by the thread that owns the Parker,
// which means that `self.state != PARKED`.
pub unsafe fn park_timeout(&self, timeout: Duration) {
// Change NOTIFIED=>EMPTY or EMPTY=>PARKED, and directly return in the
// first case.
if self.state.fetch_sub(1, Acquire) == NOTIFIED {
return;
}
if c::WaitOnAddress::is_available() {
// Wait for something to happen, assuming it's still set to PARKED.
c::WaitOnAddress(
self.ptr(),
&PARKED as *const _ as c::LPVOID,
1,
dur2timeout(timeout),
);
// Set the state back to EMPTY (from either PARKED or NOTIFIED).
// Note that we don't just write EMPTY, but use swap() to also
// include an acquire-ordered read to synchronize with unpark()'s
// release-ordered write.
if self.state.swap(EMPTY, Acquire) == NOTIFIED {
// Actually woken up by unpark().
} else {
// Timeout or spurious wake up.
// We return either way, because we can't easily tell if it was the
// timeout or not.
}
} else {
// Wait for unpark() to produce this event.
let unparked = c::WaitForSingleObject(self.event_handle(), dur2timeout(timeout))
== c::WAIT_OBJECT_0;
// Set the state back to EMPTY (from either PARKED or NOTIFIED).
let prev_state = self.state.swap(EMPTY, Acquire);
if !unparked && prev_state == NOTIFIED {
// Timed out (or error)
}
}
}
pub fn unpark(&self) {
// Change PARKED=>NOTIFIED, EMPTY=>NOTIFIED, or NOTIFIED=>NOTIFIED, and
// wake the thread in the first case.
//
// Note that even NOTIFIED=>NOTIFIED results in a write. This is on
// purpose, to make sure every unpark() has a release-acquire ordering
// with park().
if self.state.swap(NOTIFIED, Release) == PARKED {
if c::WakeByAddressSingle::is_available() {
unsafe {
c::WakeByAddressSingle(self.ptr());
}
} else {
unsafe {
c::SetEvent(self.event_handle());
}
}
}
}
fn ptr(&self) -> c::LPVOID {
&self.state as *const _ as c::LPVOID
}
fn event_handle(&self) -> c::HANDLE {
const INVALID: u32 = !0; // c::INVALID_HANDLE_VALUE
match self.event_handle.load(Relaxed) {
INVALID => {
let handle = unsafe {
c::CreateEventW(
ptr::null_mut(),
0, // Auto-reset event object
0, // Initially nonsignaled
ptr::null(),
)
};
if handle.is_null() {
panic!("Unable to create event handle: error {}", unsafe {
c::GetLastError()
});
}
match self
.event_handle
.compare_exchange(INVALID, handle as u32, Relaxed, Relaxed)
{
Ok(_) => handle,
Err(h) => {
// Lost the race to another thread initializing HANDLE before we did.
// Closing our handle and using theirs instead.
unsafe {
c::CloseHandle(handle);
}
h as c::HANDLE
}
}
}
handle => handle as c::HANDLE,
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment