Skip to content

Instantly share code, notes, and snippets.

@fwqaaq
Last active June 26, 2024 07:37
Show Gist options
  • Save fwqaaq/75f41f0ba39e3ea76e0d5ff2d0b5cce3 to your computer and use it in GitHub Desktop.
Save fwqaaq/75f41f0ba39e3ea76e0d5ff2d0b5cce3 to your computer and use it in GitHub Desktop.
Rust 常用示例

Send 和 Sync

这两个类型有一定联系,但是并不是由谁决定谁。

  • 仅实现 Send:Cell 以及 RefCell,可以在线程之间移动所有权,但是不能共享,也就是不能多个线程同时修改其内部值
  • 仅实现 Sync:MutexGuard(各种锁的 guard),它们的共享引用可以在线程之间解锁(lock),但是所有权不能直接被转移
  • 实现 Send 和 Sync:大多数类型都设置了 Send 和 Sync

参考:https://zhuanlan.zhihu.com/p/24142191异步中的 Send 和 Sync

Pin

在 Rust 中,Pin 类型用于确保数据在内存中的位置是固定的,即它们的内存地址不会改变。

在构建自引用结构时非常有用,因为如果一个对象包含指向自身的指针,那么移动该对象将使这些指针失效,可能导致未定义的行为。

Pin<P> 确保任何指针类型 P 的指向对象在内存中的位置是稳定的,这意味着它不能被移动到其他地方,也不能在它被丢弃之前释放其内存。PhantomPinned 的主要用途是阻止一个类型自动实现 Unpin trait。它告诉编译器,即使所有字段都是 Unpin,这个类型也应该被视为不可移动的。

use std::{marker::PhantomPinned, pin::Pin, ptr::NonNull};

struct SelfReferential {
    self_ptr: Option<NonNull<Self>>,
    _pin: PhantomPinned,
}

impl SelfReferential {
    fn new() -> Pin<Box<Self>> {
        let mut b = Box::pin(SelfReferential {
            self_ptr: None,
            _pin: PhantomPinned,
        });
        let self_ptr = NonNull::from(&*b);
        unsafe {
            b.as_mut().get_unchecked_mut().self_ptr = Some(self_ptr);
        }
        b
    }

    fn verify_address(&self) -> bool {
        match self.self_ptr {
            Some(ptr) => ptr.as_ptr() == self as *const Self as *mut Self,
            None => false,
        }
    }
}

struct Moveable {
    data: String,
    data_ptr: *const String,
}

impl Moveable {
    fn new(data: String) -> Self {
        let data_ptr = &data as *const String;
        Moveable { data, data_ptr }
    }

    fn verify_address(&self) -> bool {
        self.data_ptr == &self.data as *const String
    }
}

fn main() {
    let sr = SelfReferential::new();
    println!("Address of self: {}", sr.verify_address());

    let m = Moveable::new("hello".to_string());
    println!("Address of data: {}", m.verify_address());
}

使用 pin-project-lite 宏可以简化 Pin,并且非常轻量。看 hyper-util 中的示例,#[pin]TokioIo<T> 被用作 Pin<&mut TokioIo<T>> 时,你可以安全地获取 inner 字段作为 Pin<&mut T>(它会生成一个 project 方法),下面可以模拟一下不使用该宏的方式(直接使用 .project() 获取 &mut T 而不需要 .inner 拿到 &mut T):

#[derive(Debug)]
pub struct TokioIo<T> {
    inner: T,
}

impl<T> TokioIo<T> {
    //...

    // 这个方法用于安全地获取内部字段的可变引用
    fn project(self: Pin<&mut Self>) -> Pin<&mut T> {
        // 不会移动任何被固定的数据
        unsafe { self.map_unchecked_mut(|s| &mut s.inner) }
    }
}

Arc 来自引用

在 Rust 中 Arc<RwLock<T>>Arc<Mutex<T>> 确实也可以用来实现自引用数据结构。那么为什么仍然需要 Pin 呢?

  1. 内存安全保证更强:Pin 提供了在编译期验证数据不移动的保证。而 Mutex/RwLock 在运行时需要手动加锁,如果使用不当可能导致数据竞争。从内存安全的角度看,Pin 更胜一筹。
  2. 性能更好:Pin 不需要运行时锁的开销。并且编译器可以进行更多优化。所以 Pin 通常性能更好。
  3. 更符合 Rust 的设计哲学:Pin 基于所有权和借用等 Rust 核心概念设计,提供零成本的抽象。而 Mutex/RwLock 更像是“锁”这种传统并发机制。从语言自洽性考虑,Pin 更符合 Rust。
  4. 适用场景不同:Pin 适用于单线程场景下的自引用。而 Mutex/RwLock 适用于多线程共享的可变状态。两者侧重点不同。
use std::sync::{Arc, Mutex, Weak};

#[derive(Debug)]
struct Node {
    value: i32,
    next: Option<Weak<Mutex<Node>>>,
}

fn main() {
    let node1 = Arc::new(Mutex::new(Node {
        value: 1,
        next: None,
    }));
    let node2 = Arc::new(Mutex::new(Node {
        value: 2,
        next: None,
    }));

    // 添加循环的 Weak 引用
    {
        let mut _node1 = node1.lock().unwrap();
        _node1.next = Some(Arc::downgrade(&node2));

        let mut _node2 = node2.lock().unwrap();
        _node2.next = Some(Arc::downgrade(&node2));
    }
    {
        let node1 = node1.lock().unwrap();
        // 使用强引用访问 Node
        println!(
            "node2 value: {:#?}",
            node1.next.clone().unwrap().upgrade().unwrap().lock().unwrap().value
        );
    }
}

示例

参考:https://stackoverflow.com/questions/32300132/why-cant-i-store-a-value-and-a-reference-to-that-value-in-the-same-struct

Future

  1. Rust 的 future 不表示后台计算:在某些语言中,future 或 promise 可能表示一个正在后台线程或其他执行上下文中运行的计算。你创建一个 future,然后它就在后台执行,你可以在某个时候询问其结果。
  2. Rust 的 future 是计算本身:在 Rust 中,一个 future 只是计算的一个表示,它封装了计算的逻辑,但并不自己执行。它包含了异步计算的所有必要状态和逻辑,但只有当你显式询问它时(通过轮询它),它才会进一步。
  3. future 的所有者负责轮询:在 Rust 中,future 不会自己推进自己的计算。相反,它必须被轮询,这通常由执行器(例如 tokio 或 async-std 的运行时)完成。当 future 被轮询时,它将执行尽可能多的计算,直到达到一个暂停点(可能是等待 I/O、定时器等)。然后,它将控制权返回给执行器,该执行器将决定何时再次轮询 future。
  4. 通过调用 Future::poll 来轮询:在 Rust 中,轮询一个 future 的方法是调用其 poll 方法。这个方法会尽可能推进计算,并返回一个 Poll::Ready 如果计算完成,或者 Poll::Pending 如果计算被暂停并准备在以后继续。
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::time::{sleep, Duration};

struct MyFuture {
    completed: bool,
}

impl Future for MyFuture {
    type Output = Result<&'static str, &'static str>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        if !self.completed {
            // Simulate an asynchronous operation
            let waker = cx.waker().clone();
            let task = async move {
                sleep(Duration::from_secs(2)).await;
                waker.wake(); // Wake the task after completion
            };
            tokio::spawn(task);

            self.completed = true;
            Poll::Pending
        } else {
            Poll::Ready(Ok("Hello, world!")) // Return the result directly after completion
        }
    }
}

#[tokio::main]
async fn main() {
    let future = MyFuture { completed: false };

    // Explicitly poll the future, starting the asynchronous computation
    match future.await {
        Ok(val) => println!("Result: {}", val),
        Err(e) => println!("Error: {}", e),
    }
}

Future 与 Promise 的区别

  1. 执行模型:
  • Rust 的 Futures:Rust 的 futures 是一种协作式异步模型,必须由执行器(如 tokio 或 async-std 的运行时)显式轮询。future 不会自动执行,也不会自动调度。与之相反,它们通过轮询机制,可以是无栈的,每次轮询可以推进计算。
  • JavaScript 的 Promises:JavaScript 的 promises 是由事件循环自动处理的。一旦 promise 被创建并具有适当的兑现或拒绝处理程序,它就会自动安排在事件循环中运行。与 Rust 的 futures 不同,promises 不需要外部执行器显式轮询。
  1. 栈与无栈:
  • Rust 的 Futures:Rust 支持无栈异步编程,其中每个 .await 表达式可以被视为一个挂起点,在该点上控制权可以返回到调用者。这使得异步操作可以在不消耗大量栈空间的情况下高效执行。
  • JavaScript 的 Promises:虽然 JavaScript 的 promises 也具有非阻塞性,但 JavaScript 运行时通常使用事件循环来处理异步操作,而不是使用无栈协程。这意味着每个异步操作都可能在事件队列中作为单独的入口,而不是以连续的栈帧方式组织。
  1. 组合和控制:
  • Rust 的 Futures:Rust 的 futures 提供了非常精确的控制,可以方便地组合和转换。Rust 的 futures 可以被暂停、恢复和组合,为复杂的并发模式提供了强大的构建块。
  • JavaScript 的 Promises:promises 也可以组合,例如通过 Promise.all 或 Promise.race,但它们提供的控制粒度不如 Rust 的 futures。

Channel

Rust 标准库提供了异步 channel(channel),以及同步 channel(sync_channel)。同步 Channel 中可以设置缓冲区,如果发送的消息大于缓冲区设置的值(N),那么第 N + 1 条消息就会阻塞。

  • mpsc:允许多个发送(生产者)线程和一个接收(消费者)线程之间的通信。发送者可以持续发送消息,而接收者可以逐个接收这些消息。
  • oneshot:是一种特殊类型的 channel,只允许发送一次消息。发送者发送一个消息后,就不能再发送了;接收者接收到这个消息后,channel 就会关闭。

CSP

CSP(Communicating Sequential Processes)是由一组并发执行的进程组成的。这些进程利用 channel 进行通信,并且只能通过这些 channels 发送或接收消息。

  1. 进程 (Processe):并发执行的实体,它们执行计算并通过 channel 进行通讯。
  2. 通道 (Channel):通讯的路径,进程可以通过它们发送或接收消息。
  3. 同步通信:在 CSP 中,通信是同步的。这意味着发送进程会阻塞,直到接收进程准备好接收消息。
  4. 并发组合:进程可以并发组合,形成更大的并发系统。

Rust 标准库提供了异步 channel(channel),以及同步 channel(sync_channel)。同步 Channel 中可以设置缓冲区,如果发送的消息大于缓冲区设置的值(N),那么第 N + 1 条消息就会阻塞。

以下是 CSP 模型的图以及实现:

mpsc

需要启动:mini-redis-server

use bytes::Bytes;
use tokio::sync::{mpsc, oneshot};

#[derive(Debug)]
enum Command {
    Get {
        key: String,
        resp: Responder<Option<Bytes>>,
    },
    Set {
        key: String,
        value: Bytes,
        resp: Responder<()>,
    },
}

type Responder<T> = oneshot::Sender<mini_redis::Result<T>>;

#[tokio::main]
async fn main() {
    let (tx, mut rx) = mpsc::channel(32);
    // Clone a second transmitter, corresponding to Get and Set requests
    let tx2 = tx.clone();

    // Client task
    let manager = tokio::spawn(async move {
        let mut client = mini_redis::client::connect("127.0.0.1:6379").await.unwrap();
        while let Some(cmd) = rx.recv().await {
            match cmd {
                Command::Get { key, resp } => {
                    let res = client.get(&key).await;
                    let _ = resp.send(res);
                }
                Command::Set { key, value, resp } => {
                    let res = client.set(&key, value).await;
                    let _ = resp.send(res);
                }
            }
        }
    });

    // Spawn two tasks, one setting a value and other getting a value
    let t1 = tokio::spawn(async move {
        // Create a oneshot channel
        let (resp_tx, resp_rx) = oneshot::channel();
        let cmd = Command::Get {
            key: "hello".to_string(),
            resp: resp_tx,
        };

        // Send the GET request
        if tx.send(cmd).await.is_err() {
            eprintln!("connection task shutdown");
            return;
        }

        // Await the response
        let res = resp_rx.await;
        println!("GOT {{Get}} = {:?}", res);
    });

    let t2 = tokio::spawn(async move {
        let (resp_tx, resp_rx) = oneshot::channel();
        let cmd = Command::Set {
            key: "hello".to_string(),
            value: "world".into(),
            resp: resp_tx,
        };

        if tx2.send(cmd).await.is_err() {
            eprintln!("connection task shutdown");
            return;
        }

        let res = resp_rx.await;
        println!("GOT {{Set}} = {:?}", res);
    });

    t1.await.unwrap();
    t2.await.unwrap();
    manager.await.unwrap();
}

参考:https://course.rs/advance/concurrency-with-threads/message-passing.html

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment