Skip to content

Instantly share code, notes, and snippets.

@codingskynet
Last active October 6, 2021 13:27
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save codingskynet/a4e0abef98065c2a5698a41caf4f0331 to your computer and use it in GitHub Desktop.
Save codingskynet/a4e0abef98065c2a5698a41caf4f0331 to your computer and use it in GitHub Desktop.
C++11 Memory Model: Atomic부터 Lock-Free 자료구조까지 - 예제
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::Instant;
use crossbeam::utils::Backoff;
struct SpinLock {
flag: AtomicBool,
}
impl SpinLock {
fn new() -> Self {
Self {
flag: AtomicBool::new(false),
}
}
fn lock(&self) {
let backoff = Backoff::new();
while self
.flag
.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
.is_err()
{
backoff.snooze();
}
}
fn unlock(&self) {
self.flag.store(false, Ordering::Release);
}
}
struct Counter(*mut i32);
unsafe impl Send for Counter {}
fn main() {
let iter = 16;
let mut result = 0;
for _ in 0..iter {
let mut threads = Vec::new();
let lock = Arc::new(SpinLock::new());
let mut counter = 0;
let start = Instant::now();
for _ in 0..4 {
let lock = Arc::clone(&lock);
let counter = Counter(&mut counter as *mut _);
threads.push(thread::spawn(move || {
for _ in 0..1_000_000 {
lock.lock();
unsafe {
*(counter.0) += 1;
}
lock.unlock();
}
}));
}
for t in threads {
t.join().unwrap();
}
debug_assert_eq!(counter, 4_000_000);
let end = start.elapsed().as_millis();
result += end;
}
println!("avg: {} ms", result / iter);
}
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::Instant;
struct SpinLock {
flag: AtomicBool,
}
impl SpinLock {
fn new() -> Self {
Self {
flag: AtomicBool::new(false),
}
}
fn lock(&self) {
while self
.flag
.compare_exchange_weak(false, true, Ordering::Acquire, Ordering::Relaxed)
.is_err()
{}
}
fn unlock(&self) {
self.flag.store(false, Ordering::Release);
}
}
struct Counter(*mut i32);
unsafe impl Send for Counter {}
fn main() {
let iter = 16;
let mut result = 0;
for _ in 0..iter {
let mut threads = Vec::new();
let lock = Arc::new(SpinLock::new());
let mut counter = 0;
let start = Instant::now();
for _ in 0..4 {
let lock = Arc::clone(&lock);
let counter = Counter(&mut counter as *mut _);
threads.push(thread::spawn(move || {
for _ in 0..1_000_000 {
lock.lock();
unsafe {
*(counter.0) += 1;
}
lock.unlock();
}
}));
}
for t in threads {
t.join().unwrap();
}
debug_assert_eq!(counter, 4_000_000);
let end = start.elapsed().as_millis();
result += end;
}
println!("avg: {} ms", result / iter);
}
use std::{sync::{
atomic::{AtomicI32, Ordering},
Arc,
}, thread, time::Instant};
fn main() {
let iter = 16;
let mut result = 0;
for _ in 0..iter {
let mut threads = Vec::new();
let counter = Arc::new(AtomicI32::new(0));
let start = Instant::now();
for _ in 0..4 {
let counter = Arc::clone(&counter);
threads.push(thread::spawn(move || {
for _ in 0..2_000_000 {
counter.fetch_add(1, Ordering::Relaxed);
}
}));
}
for t in threads {
t.join().unwrap();
}
debug_assert_eq!(counter.load(Ordering::Relaxed), 8_000_000);
result += start.elapsed().as_millis();
}
println!("avg: {} ms", result / iter);
}
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::Instant;
struct SpinLock {
flag: AtomicBool,
}
impl SpinLock {
fn new() -> Self {
Self {
flag: AtomicBool::new(false),
}
}
fn lock(&self) {
while self
.flag
.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
.is_err()
{}
}
fn unlock(&self) {
self.flag.store(false, Ordering::Release);
}
}
struct Counter(*mut i32);
unsafe impl Send for Counter {}
fn main() {
let iter = 16;
let mut result = 0;
for _ in 0..iter {
let mut threads = Vec::new();
let lock = Arc::new(SpinLock::new());
let mut counter = 0;
let start = Instant::now();
for _ in 0..4 {
let lock = Arc::clone(&lock);
let counter = Counter(&mut counter as *mut _);
threads.push(thread::spawn(move || {
for _ in 0..1_000_000 {
lock.lock();
unsafe {
*(counter.0) += 1;
}
lock.unlock();
}
}));
}
for t in threads {
t.join().unwrap();
}
debug_assert_eq!(counter, 4_000_000);
let end = start.elapsed().as_millis();
result += end;
}
println!("avg: {} ms", result / iter);
}
use std::time::Instant;
use rand::{thread_rng, Rng};
use std::{mem::ManuallyDrop, ptr, sync::atomic::Ordering, thread, time::Duration};
use crossbeam_epoch::{pin, Atomic, Guard, Owned, Shared};
use crossbeam_utils::{thread::scope, Backoff};
pub trait ConcurrentStack<V> {
fn new() -> Self;
fn push(&self, value: V, guard: &Guard);
fn pop(&self, guard: &Guard) -> Option<V>;
}
pub struct TreiberStack<V> {
head: Atomic<Node<V>>,
}
impl<V> Default for TreiberStack<V> {
fn default() -> Self {
Self::new()
}
}
struct Node<V> {
value: ManuallyDrop<V>,
next: Atomic<Node<V>>,
}
impl<V> Node<V> {
fn new(value: V) -> Self {
Self {
value: ManuallyDrop::new(value),
next: Atomic::null(),
}
}
}
impl<V> TreiberStack<V> {
fn try_push(&self, node: Owned<Node<V>>, guard: &Guard) -> Result<(), Owned<Node<V>>> {
let head = self.head.load(Ordering::Relaxed, guard);
node.next.store(head, Ordering::Relaxed);
match self
.head
.compare_exchange(head, node, Ordering::Release, Ordering::Relaxed, guard)
{
Ok(_) => Ok(()),
Err(e) => Err(e.new),
}
}
fn try_pop(&self, guard: &Guard) -> Result<Option<V>, ()> {
let head = self.head.load(Ordering::Acquire, guard);
if let Some(h) = unsafe { head.as_ref() } {
let next = h.next.load(Ordering::Relaxed, guard);
if self
.head
.compare_exchange(head, next, Ordering::Relaxed, Ordering::Relaxed, guard)
.is_ok()
{
unsafe { guard.defer_destroy(head) };
return unsafe { Ok(Some(ManuallyDrop::into_inner(ptr::read(&(*h).value)))) };
}
return Err(());
} else {
return Ok(None);
}
}
}
impl<V> ConcurrentStack<V> for TreiberStack<V> {
fn new() -> Self {
Self {
head: Atomic::null(),
}
}
fn push(&self, value: V, guard: &Guard) {
let mut node = Owned::new(Node::new(value));
let backoff = Backoff::new();
while let Err(e) = self.try_push(node, guard) {
node = e;
backoff.spin();
}
}
fn pop(&self, guard: &Guard) -> Option<V> {
let backoff = Backoff::new();
loop {
if let Ok(value) = self.try_pop(guard) {
return value;
}
backoff.spin();
}
}
}
const ELIM_SIZE: usize = 16;
const ELIM_DELAY: Duration = Duration::from_millis(1);
pub struct EBStack<V> {
stack: TreiberStack<V>,
slots: [Atomic<Node<V>>; ELIM_SIZE],
}
#[inline]
fn rand_idx() -> usize {
thread_rng().gen_range(0..ELIM_SIZE)
}
impl<V> Default for EBStack<V> {
fn default() -> Self {
Self::new()
}
}
impl<V> EBStack<V> {
fn try_push(&self, node: Owned<Node<V>>, guard: &Guard) -> Result<(), Owned<Node<V>>> {
let node = match self.stack.try_push(node, guard) {
Ok(_) => return Ok(()),
Err(node) => node.into_shared(guard),
};
let slot = unsafe { self.slots.get_unchecked(rand_idx()) };
let s = slot.load(Ordering::Relaxed, guard);
let tag = s.tag();
let result = match tag {
0 => slot.compare_exchange(
s,
node.with_tag(1),
Ordering::Release,
Ordering::Relaxed,
guard,
),
2 => slot.compare_exchange(
s,
node.with_tag(3),
Ordering::Release,
Ordering::Relaxed,
guard,
),
_ => return unsafe { Err(node.into_owned()) },
};
if let Err(e) = result {
return unsafe { Err(e.new.into_owned()) };
}
thread::sleep(ELIM_DELAY);
let s = slot.load(Ordering::Relaxed, guard);
if tag == 0 && s.tag() == 1 {
return match slot.compare_exchange(
node.with_tag(1),
Shared::null(),
Ordering::Relaxed,
Ordering::Relaxed,
guard,
) {
Ok(_) => unsafe { Err(s.into_owned()) },
Err(_) => Ok(()),
};
}
Ok(())
}
fn try_pop(&self, guard: &Guard) -> Result<Option<V>, ()> {
if let Ok(value) = self.stack.try_pop(guard) {
return Ok(value);
}
let slot = unsafe { self.slots.get_unchecked(rand_idx()) };
let s = slot.load(Ordering::Relaxed, guard);
let result = match s.tag() {
0 => slot.compare_exchange(
s,
s.with_tag(2),
Ordering::Relaxed,
Ordering::Relaxed,
guard,
),
1 => slot.compare_exchange(
s,
s.with_tag(3),
Ordering::Relaxed,
Ordering::Relaxed,
guard,
),
_ => return Err(()),
};
if result.is_err() {
return Err(());
}
thread::sleep(ELIM_DELAY);
let s = slot.load(Ordering::Acquire, guard);
if s.tag() == 3 {
slot.store(Shared::null(), Ordering::Relaxed);
let node = unsafe { s.into_owned() };
let value = ManuallyDrop::into_inner(node.into_box().value);
Ok(Some(value))
} else {
slot.store(Shared::null(), Ordering::Relaxed);
Err(())
}
}
}
impl<V> ConcurrentStack<V> for EBStack<V> {
fn new() -> Self {
Self {
stack: TreiberStack::new(),
slots: Default::default(),
}
}
fn push(&self, value: V, guard: &Guard) {
let mut node = Owned::new(Node::new(value));
while let Err(e) = self.try_push(node, guard) {
node = e;
}
}
fn pop(&self, guard: &Guard) -> Option<V> {
loop {
if let Ok(value) = self.try_pop(guard) {
return value;
}
}
}
}
fn main() {
let op = 100_000;
let mut thread = 1;
while thread <= 128 {
let iter = 16;
let mut result = 0;
for _ in 0..iter {
let stack = TreiberStack::new();
let start = Instant::now();
scope(|scope| {
for _ in 0..thread {
scope.spawn(|_| {
for i in 0..op {
if thread_rng().gen::<i32>() % 2 == 0 {
stack.push(i, &pin());
} else {
stack.pop(&pin());
}
}
});
}
})
.unwrap();
result += start.elapsed().as_millis();
}
println!("thread {}, avg: {} ms", thread, result / iter);
thread *= 2;
}
}
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::Instant;
struct SpinLock {
flag: AtomicBool,
}
impl SpinLock {
fn new() -> Self {
Self {
flag: AtomicBool::new(false),
}
}
fn lock(&self) {
loop {
if self.flag.load(Ordering::Relaxed) {
continue;
}
if self
.flag
.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
.is_ok()
{
break;
}
}
}
fn unlock(&self) {
self.flag.store(false, Ordering::Release);
}
}
struct Counter(*mut i32);
unsafe impl Send for Counter {}
fn main() {
let iter = 16;
let mut result = 0;
for _ in 0..iter {
let mut threads = Vec::new();
let lock = Arc::new(SpinLock::new());
let mut counter = 0;
let start = Instant::now();
for _ in 0..4 {
let lock = Arc::clone(&lock);
let counter = Counter(&mut counter as *mut _);
threads.push(thread::spawn(move || {
for _ in 0..1_000_000 {
lock.lock();
unsafe {
*(counter.0) += 1;
}
lock.unlock();
}
}));
}
for t in threads {
t.join().unwrap();
}
debug_assert_eq!(counter, 4_000_000);
let end = start.elapsed().as_millis();
result += end;
}
println!("avg: {} ms", result / iter);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment