这两个类型有一定联系,但是并不是由谁决定谁。
- 仅实现 Send:Cell 以及 RefCell,可以在线程之间移动所有权,但是不能共享,也就是不能多个线程同时修改其内部值
- 仅实现 Sync:MutexGuard(各种锁的 guard),它们的共享引用可以在线程之间解锁(
lock
),但是所有权不能直接被转移 - 实现 Send 和 Sync:大多数类型都设置了 Send 和 Sync
参考:https://zhuanlan.zhihu.com/p/24142191、异步中的 Send 和 Sync
在 Rust 中,Pin 类型用于确保数据在内存中的位置是固定的,即它们的内存地址不会改变。
在构建自引用结构时非常有用,因为如果一个对象包含指向自身的指针,那么移动该对象将使这些指针失效,可能导致未定义的行为。
Pin<P>
确保任何指针类型 P 的指向对象在内存中的位置是稳定的,这意味着它不能被移动到其他地方,也不能在它被丢弃之前释放其内存。PhantomPinned
的主要用途是阻止一个类型自动实现 Unpin trait。它告诉编译器,即使所有字段都是 Unpin,这个类型也应该被视为不可移动的。
use std::{marker::PhantomPinned, pin::Pin, ptr::NonNull};
struct SelfReferential {
self_ptr: Option<NonNull<Self>>,
_pin: PhantomPinned,
}
impl SelfReferential {
fn new() -> Pin<Box<Self>> {
let mut b = Box::pin(SelfReferential {
self_ptr: None,
_pin: PhantomPinned,
});
let self_ptr = NonNull::from(&*b);
unsafe {
b.as_mut().get_unchecked_mut().self_ptr = Some(self_ptr);
}
b
}
fn verify_address(&self) -> bool {
match self.self_ptr {
Some(ptr) => ptr.as_ptr() == self as *const Self as *mut Self,
None => false,
}
}
}
struct Moveable {
data: String,
data_ptr: *const String,
}
impl Moveable {
fn new(data: String) -> Self {
let data_ptr = &data as *const String;
Moveable { data, data_ptr }
}
fn verify_address(&self) -> bool {
self.data_ptr == &self.data as *const String
}
}
fn main() {
let sr = SelfReferential::new();
println!("Address of self: {}", sr.verify_address());
let m = Moveable::new("hello".to_string());
println!("Address of data: {}", m.verify_address());
}
使用 pin-project-lite 宏可以简化 Pin,并且非常轻量。看 hyper-util 中的示例,#[pin]
当 TokioIo<T>
被用作 Pin<&mut TokioIo<T>>
时,你可以安全地获取 inner 字段作为 Pin<&mut T>
(它会生成一个 project
方法),下面可以模拟一下不使用该宏的方式(直接使用 .project()
获取 &mut T
而不需要 .inner
拿到 &mut T
):
#[derive(Debug)]
pub struct TokioIo<T> {
inner: T,
}
impl<T> TokioIo<T> {
//...
// 这个方法用于安全地获取内部字段的可变引用
fn project(self: Pin<&mut Self>) -> Pin<&mut T> {
// 不会移动任何被固定的数据
unsafe { self.map_unchecked_mut(|s| &mut s.inner) }
}
}
在 Rust 中 Arc<RwLock<T>>
和 Arc<Mutex<T>>
确实也可以用来实现自引用数据结构。那么为什么仍然需要 Pin 呢?
- 内存安全保证更强:Pin 提供了在编译期验证数据不移动的保证。而 Mutex/RwLock 在运行时需要手动加锁,如果使用不当可能导致数据竞争。从内存安全的角度看,Pin 更胜一筹。
- 性能更好:Pin 不需要运行时锁的开销。并且编译器可以进行更多优化。所以 Pin 通常性能更好。
- 更符合 Rust 的设计哲学:Pin 基于所有权和借用等 Rust 核心概念设计,提供零成本的抽象。而 Mutex/RwLock 更像是“锁”这种传统并发机制。从语言自洽性考虑,Pin 更符合 Rust。
- 适用场景不同:Pin 适用于单线程场景下的自引用。而 Mutex/RwLock 适用于多线程共享的可变状态。两者侧重点不同。
use std::sync::{Arc, Mutex, Weak};
#[derive(Debug)]
struct Node {
value: i32,
next: Option<Weak<Mutex<Node>>>,
}
fn main() {
let node1 = Arc::new(Mutex::new(Node {
value: 1,
next: None,
}));
let node2 = Arc::new(Mutex::new(Node {
value: 2,
next: None,
}));
// 添加循环的 Weak 引用
{
let mut _node1 = node1.lock().unwrap();
_node1.next = Some(Arc::downgrade(&node2));
let mut _node2 = node2.lock().unwrap();
_node2.next = Some(Arc::downgrade(&node2));
}
{
let node1 = node1.lock().unwrap();
// 使用强引用访问 Node
println!(
"node2 value: {:#?}",
node1.next.clone().unwrap().upgrade().unwrap().lock().unwrap().value
);
}
}
- Rust 的 future 不表示后台计算:在某些语言中,future 或 promise 可能表示一个正在后台线程或其他执行上下文中运行的计算。你创建一个 future,然后它就在后台执行,你可以在某个时候询问其结果。
- Rust 的 future 是计算本身:在 Rust 中,一个 future 只是计算的一个表示,它封装了计算的逻辑,但并不自己执行。它包含了异步计算的所有必要状态和逻辑,但只有当你显式询问它时(通过轮询它),它才会进一步。
- future 的所有者负责轮询:在 Rust 中,future 不会自己推进自己的计算。相反,它必须被轮询,这通常由执行器(例如 tokio 或 async-std 的运行时)完成。当 future 被轮询时,它将执行尽可能多的计算,直到达到一个暂停点(可能是等待 I/O、定时器等)。然后,它将控制权返回给执行器,该执行器将决定何时再次轮询 future。
- 通过调用 Future::poll 来轮询:在 Rust 中,轮询一个 future 的方法是调用其 poll 方法。这个方法会尽可能推进计算,并返回一个 Poll::Ready 如果计算完成,或者 Poll::Pending 如果计算被暂停并准备在以后继续。
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::time::{sleep, Duration};
struct MyFuture {
completed: bool,
}
impl Future for MyFuture {
type Output = Result<&'static str, &'static str>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if !self.completed {
// Simulate an asynchronous operation
let waker = cx.waker().clone();
let task = async move {
sleep(Duration::from_secs(2)).await;
waker.wake(); // Wake the task after completion
};
tokio::spawn(task);
self.completed = true;
Poll::Pending
} else {
Poll::Ready(Ok("Hello, world!")) // Return the result directly after completion
}
}
}
#[tokio::main]
async fn main() {
let future = MyFuture { completed: false };
// Explicitly poll the future, starting the asynchronous computation
match future.await {
Ok(val) => println!("Result: {}", val),
Err(e) => println!("Error: {}", e),
}
}
Future 与 Promise 的区别
- 执行模型:
- Rust 的 Futures:Rust 的 futures 是一种协作式异步模型,必须由执行器(如 tokio 或 async-std 的运行时)显式轮询。future 不会自动执行,也不会自动调度。与之相反,它们通过轮询机制,可以是无栈的,每次轮询可以推进计算。
- JavaScript 的 Promises:JavaScript 的 promises 是由事件循环自动处理的。一旦 promise 被创建并具有适当的兑现或拒绝处理程序,它就会自动安排在事件循环中运行。与 Rust 的 futures 不同,promises 不需要外部执行器显式轮询。
- 栈与无栈:
- Rust 的 Futures:Rust 支持无栈异步编程,其中每个 .await 表达式可以被视为一个挂起点,在该点上控制权可以返回到调用者。这使得异步操作可以在不消耗大量栈空间的情况下高效执行。
- JavaScript 的 Promises:虽然 JavaScript 的 promises 也具有非阻塞性,但 JavaScript 运行时通常使用事件循环来处理异步操作,而不是使用无栈协程。这意味着每个异步操作都可能在事件队列中作为单独的入口,而不是以连续的栈帧方式组织。
- 组合和控制:
- Rust 的 Futures:Rust 的 futures 提供了非常精确的控制,可以方便地组合和转换。Rust 的 futures 可以被暂停、恢复和组合,为复杂的并发模式提供了强大的构建块。
- JavaScript 的 Promises:promises 也可以组合,例如通过 Promise.all 或 Promise.race,但它们提供的控制粒度不如 Rust 的 futures。
Rust 标准库提供了异步 channel(channel
),以及同步 channel(sync_channel
)。同步 Channel 中可以设置缓冲区,如果发送的消息大于缓冲区设置的值(N),那么第 N + 1 条消息就会阻塞。
mpsc
:允许多个发送(生产者)线程和一个接收(消费者)线程之间的通信。发送者可以持续发送消息,而接收者可以逐个接收这些消息。oneshot
:是一种特殊类型的 channel,只允许发送一次消息。发送者发送一个消息后,就不能再发送了;接收者接收到这个消息后,channel 就会关闭。
CSP(Communicating Sequential Processes)是由一组并发执行的进程组成的。这些进程利用 channel 进行通信,并且只能通过这些 channels 发送或接收消息。
- 进程 (Processe):并发执行的实体,它们执行计算并通过 channel 进行通讯。
- 通道 (Channel):通讯的路径,进程可以通过它们发送或接收消息。
- 同步通信:在 CSP 中,通信是同步的。这意味着发送进程会阻塞,直到接收进程准备好接收消息。
- 并发组合:进程可以并发组合,形成更大的并发系统。
Rust 标准库提供了异步 channel(channel
),以及同步 channel(sync_channel
)。同步 Channel 中可以设置缓冲区,如果发送的消息大于缓冲区设置的值(N),那么第 N + 1 条消息就会阻塞。
以下是 CSP 模型的图以及实现:
需要启动:mini-redis-server
use bytes::Bytes;
use tokio::sync::{mpsc, oneshot};
#[derive(Debug)]
enum Command {
Get {
key: String,
resp: Responder<Option<Bytes>>,
},
Set {
key: String,
value: Bytes,
resp: Responder<()>,
},
}
type Responder<T> = oneshot::Sender<mini_redis::Result<T>>;
#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel(32);
// Clone a second transmitter, corresponding to Get and Set requests
let tx2 = tx.clone();
// Client task
let manager = tokio::spawn(async move {
let mut client = mini_redis::client::connect("127.0.0.1:6379").await.unwrap();
while let Some(cmd) = rx.recv().await {
match cmd {
Command::Get { key, resp } => {
let res = client.get(&key).await;
let _ = resp.send(res);
}
Command::Set { key, value, resp } => {
let res = client.set(&key, value).await;
let _ = resp.send(res);
}
}
}
});
// Spawn two tasks, one setting a value and other getting a value
let t1 = tokio::spawn(async move {
// Create a oneshot channel
let (resp_tx, resp_rx) = oneshot::channel();
let cmd = Command::Get {
key: "hello".to_string(),
resp: resp_tx,
};
// Send the GET request
if tx.send(cmd).await.is_err() {
eprintln!("connection task shutdown");
return;
}
// Await the response
let res = resp_rx.await;
println!("GOT {{Get}} = {:?}", res);
});
let t2 = tokio::spawn(async move {
let (resp_tx, resp_rx) = oneshot::channel();
let cmd = Command::Set {
key: "hello".to_string(),
value: "world".into(),
resp: resp_tx,
};
if tx2.send(cmd).await.is_err() {
eprintln!("connection task shutdown");
return;
}
let res = resp_rx.await;
println!("GOT {{Set}} = {:?}", res);
});
t1.await.unwrap();
t2.await.unwrap();
manager.await.unwrap();
}
参考:https://course.rs/advance/concurrency-with-threads/message-passing.html