Skip to content

Instantly share code, notes, and snippets.

@jeehoonkang
Created September 18, 2017 13:23
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jeehoonkang/7064e13a43b77354163a391c7e4254d6 to your computer and use it in GitHub Desktop.
Save jeehoonkang/7064e13a43b77354163a391c7e4254d6 to your computer and use it in GitHub Desktop.
Intrusive Concurrent MPSC Queue
/* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved.
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
* SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
* OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
* ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
* The views and conclusions contained in the software and documentation are
* those of the authors and should not be interpreted as representing official
* policies, either expressed or implied, of Dmitry Vyukov.
*/
//! A mostly lock-free multi-producer, single consumer queue.
//!
//! This module contains an implementation of a concurrent MPSC queue. This
//! queue can be used to share data between threads, and is also used as the
//! building block of channels in rust.
//!
//! Note that the current implementation of this queue has a caveat of the `pop`
//! method, and see the method for more information about it. Due to this
//! caveat, this queue may not be appropriate for all use-cases.
// http://www.1024cores.net/home/lock-free-algorithms
// /queues/intrusive-mpsc-node-based-queue
pub use self::PopResult::*;
use std::prelude::v1::*;
use std::ptr;
use std::sync::atomic::{AtomicPtr, Ordering};
/// A result of the `pop` function.
pub enum PopResult<T: Send> {
/// Some data has been popped
Data(*mut Node<T>),
/// The queue is empty
Empty,
/// The queue is in an inconsistent state. Popping data should succeed, but
/// some pushers have yet to make enough progress in order allow a pop to
/// succeed. It is recommended that a pop() occur "in the near future" in
/// order to see if the sender has made progress or not
Inconsistent,
}
#[derive(Debug)]
pub struct Node<T: Send> {
next: AtomicPtr<Node<T>>,
}
/// The multi-producer single-consumer structure. This is not cloneable, but it
/// may be safely shared so long as it is guaranteed that there is only one
/// popper at a time (many pushers are allowed).
#[derive(Debug, Default)]
pub struct Queue<T: Send> {
head: AtomicPtr<Node<T>>,
tail: AtomicPtr<Node<T>>,
stub: Node<T>,
}
unsafe impl<T: Send> Send for Queue<T> { }
unsafe impl<T: Send> Sync for Queue<T> { }
impl<T: Send> Default for Node<T> {
fn default() -> Self {
Self { next: AtomicPtr::default() }
}
}
impl<T: Send> Queue<T> {
/// Creates a new queue that is safe to share among multiple producers and
/// one consumer.
pub fn new() -> Queue<T> {
let mut queue = Queue {
head: AtomicPtr::default(),
tail: AtomicPtr::default(),
stub: Node::default(),
};
queue.head = AtomicPtr::new(&mut queue.stub);
queue.tail = AtomicPtr::new(&mut queue.stub);
queue
}
/// Pushes a new value onto this queue.
pub fn push(&self, n: &Node<T>) {
(*n).next.store(ptr::null_mut(), Ordering::Relaxed);
let np = n as *const _ as *mut _;
let prev = self.head.swap(np, Ordering::AcqRel);
unsafe {
(*prev).next.store(np, Ordering::Release);
}
}
/// Pops some data from this queue.
///
/// Note that the current implementation means that this function cannot
/// return `Option<T>`. It is possible for this queue to be in an
/// inconsistent state where many pushes have succeeded and completely
/// finished, but pops cannot return `Some(t)`. This inconsistent state
/// happens when a pusher is pre-empted at an inopportune moment.
///
/// This inconsistent state means that this queue does indeed have data, but
/// it does not currently have access to it at this time.
///
/// This function is unsafe because only one thread can call it at a time.
pub unsafe fn pop(&self) -> PopResult<T> {
let mut tail = self.tail.load(Ordering::Relaxed);
let mut next = (*tail).next.load(Ordering::Acquire);
if tail == &self.stub as *const _ as *mut _ {
if next == ptr::null_mut() {
return PopResult::Empty;
}
self.tail.store(next, Ordering::Relaxed);
tail = next;
next = (*next).next.load(Ordering::Acquire);
}
if !next.is_null() {
self.tail.store(next, Ordering::Relaxed);
return PopResult::Data(tail);
}
let head = self.head.load(Ordering::Acquire);
if tail != head {
return PopResult::Inconsistent;
}
self.push(&self.stub);
next = (*tail).next.load(Ordering::Acquire);
if !next.is_null() {
self.tail.store(next, Ordering::Relaxed);
return PopResult::Data(tail);
}
return PopResult::Empty;
}
}
impl<T: Send> Drop for Queue<T> {
fn drop(&mut self) {
unsafe {
let mut cur = self.tail.load(Ordering::Relaxed);
while !cur.is_null() {
let next = (*cur).next.load(Ordering::Relaxed);
drop(Box::from_raw(cur));
cur = next;
}
}
}
}
@leeopop
Copy link

leeopop commented Sep 19, 2017

I think that we need the following trait.

pub trait ContainsNode {
	fn node(self) -> &mut Node;
	fn node_ptr(self) -> *mut Node;
}

@leeopop
Copy link

leeopop commented Sep 19, 2017

Also, if we use this structure like


struct ValWithNode {
val: u64,
node: Node,
}
let a : ValWithNode = Default::default();
queue.push(&a.node);

this code will not pass the ownership of a into the queue.

@leeopop
Copy link

leeopop commented Sep 19, 2017

If we want to keep Node's next to AtomicPtr, we should have

#[derive(Debug)]
pub struct Node<T: Send + ContainsNode> {
    next: AtomicPtr<Node<T>>,
    orig: *mut T,
}

to recover the container from the node.

Or, we have to have AtomicPtr and then get Node via ContainsNode trait.

@leeopop
Copy link

leeopop commented Sep 19, 2017

Currently, we have stub in Queue.stub. However, this requires that Queue should be non-movable.
I'll change Stub to be in the heap space.

@leeopop
Copy link

leeopop commented Sep 19, 2017

This is the updated version of the MPSC queue

/* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved.
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met:
 *
 *    1. Redistributions of source code must retain the above copyright notice,
 *       this list of conditions and the following disclaimer.
 *
 *    2. Redistributions in binary form must reproduce the above copyright
 *       notice, this list of conditions and the following disclaimer in the
 *       documentation and/or other materials provided with the distribution.
 *
 * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED
 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
 * SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
 * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 *
 * The views and conclusions contained in the software and documentation are
 * those of the authors and should not be interpreted as representing official
 * policies, either expressed or implied, of Dmitry Vyukov.
 */

//! A mostly lock-free multi-producer, single consumer queue.
//!
//! This module contains an implementation of a concurrent MPSC queue. This
//! queue can be used to share data between threads, and is also used as the
//! building block of channels in rust.
//!
//! Note that the current implementation of this queue has a caveat of the `pop`
//! method, and see the method for more information about it. Due to this
//! caveat, this queue may not be appropriate for all use-cases.

// http://www.1024cores.net/home/lock-free-algorithms
//                         /queues/non-intrusive-mpsc-node-based-queue

// NOTE: this implementation is lifted from the standard library and only
//       slightly modified

pub use self::PopResult::*;
use std::prelude::v1::*;

use std::ptr;
use std::sync::atomic::{AtomicPtr, Ordering};

pub trait ContainsNode<T: Send + ContainsNode<T>> {
    fn node_ptr(&self) -> *mut Node<T>;
}

/// A result of the `pop` function.
pub enum PopResult<T: Send + ContainsNode<T>> {
    /// Some data has been popped
    Data(Box<T>),
    /// The queue is empty
    Empty,
    /// The queue is in an inconsistent state. Popping data should succeed, but
    /// some pushers have yet to make enough progress in order allow a pop to
    /// succeed. It is recommended that a pop() occur "in the near future" in
    /// order to see if the sender has made progress or not
    Inconsistent,
}

#[derive(Debug)]
pub struct Node<T: Send + ContainsNode<T>> {
    next: AtomicPtr<Node<T>>,
    orig: *mut T,
}

impl<T: Send + ContainsNode<T>> Node<T> {
    pub fn new(self_ptr: *mut T) -> Node<T> {
        Node {
            next: AtomicPtr::new(ptr::null_mut()),
            orig: self_ptr,
        }
    }
    pub fn init(&mut self, orig: *mut T) {
        self.next = AtomicPtr::default();
        self.orig = orig;
    }
}

/// The multi-producer single-consumer structure. This is not cloneable, but it
/// may be safely shared so long as it is guaranteed that there is only one
/// popper at a time (many pushers are allowed).
#[derive(Debug)]
pub struct Queue<T: Send + ContainsNode<T>> {
    head: AtomicPtr<Node<T>>,
    tail: *mut Node<T>,
    stub: *mut Node<T>,
}

unsafe impl<T: Send + ContainsNode<T>> Send for Queue<T> {}
unsafe impl<T: Send + ContainsNode<T>> Sync for Queue<T> {}

impl<T: Send + ContainsNode<T>> Queue<T> {
    /// Creates a new queue that is safe to share among multiple producers and
    /// one consumer.
    pub fn new() -> Queue<T> {
        let stub_node: Box<Node<T>> = Box::new(Node::new(ptr::null_mut()));
        let node_ptr = Box::into_raw(stub_node);

        Queue {
            head: AtomicPtr::new(node_ptr),
            tail: node_ptr,
            stub: node_ptr,
        }
    }

    /// Pushes a new value onto this queue.
    pub fn push(&self, object: Box<T>) {
        let node_ptr = object.node_ptr();
        // consume the object
        let obj_ptr = Box::into_raw(object);
        (*node_ptr).orig = obj_ptr;
        self.push_internal(node_ptr);
    }

    fn push_internal(&self, node_ptr: *mut Node<T>) {
        (*node_ptr).next.store(ptr::null_mut(), Ordering::Relaxed);
        let prev = self.head.swap(node_ptr, Ordering::AcqRel);
        unsafe {
            (*prev).next.store(node_ptr, Ordering::Release);
        }
    }

    /// Pops some data from this queue.
    ///
    /// Note that the current implementation means that this function cannot
    /// return `Option<T>`. It is possible for this queue to be in an
    /// inconsistent state where many pushes have succeeded and completely
    /// finished, but pops cannot return `Some(t)`. This inconsistent state
    /// happens when a pusher is pre-empted at an inopportune moment.
    ///
    /// This inconsistent state means that this queue does indeed have data, but
    /// it does not currently have access to it at this time.
    ///
    /// This function is unsafe because only one thread can call it at a time.
    pub unsafe fn pop(&mut self) -> PopResult<T> {
        let mut tail = self.tail;
        let mut next = (*tail).next.load(Ordering::Acquire);

        if tail == self.stub {
            if next == ptr::null_mut() {
                return PopResult::Empty;
            }
            self.tail = next; //self.tail.store(next, Ordering::Relaxed);
            tail = next;
            next = (*next).next.load(Ordering::Acquire);
        }

        if !next.is_null() {
            //self.tail.store(next, Ordering::Relaxed);
            self.tail = next;
            let orig_ptr = (*tail).orig;
            debug_assert!(orig_ptr != ptr::null_mut());
            let ret_obj = Box::from_raw(orig_ptr);
            return PopResult::Data(ret_obj);
        }

        let head = self.head.load(Ordering::Acquire);
        if tail != head {
            return PopResult::Inconsistent;
        }

        self.push_internal(self.stub);

        next = (*tail).next.load(Ordering::Acquire);
        if !next.is_null() {
            //self.tail.store(next, Ordering::Relaxed);
            self.tail = next;
            let orig_ptr = (*tail).orig;
            debug_assert!(orig_ptr != ptr::null_mut());
            let ret_obj = Box::from_raw(orig_ptr);
            return PopResult::Data(ret_obj);
        }

        return PopResult::Empty;
    }
}

impl<T: Send + ContainsNode<T>> Drop for Queue<T> {
    fn drop(&mut self) {
        unsafe {
            //Can we guarantee that there is only 1 reference toward stub?
            let mut cur = self.tail; //self.tail.load(Ordering::Relaxed);
            while !cur.is_null() {
                let next = (*cur).next.load(Ordering::Relaxed);
                let orig_ptr = (*cur).orig;
                drop(Box::from_raw(orig_ptr));
                cur = next;
            }
        }
    }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment