为什么Rust不做green threading#

Send 和 Sync#

Rust中几乎所有的并发特性都是标准库或者第三方库提供的,真正由Rust语言本身提供的很少。而std::marker中的traits SendSync算是其中一个。

**实现Send的类型值的所有权可以在线程间传递。**Rust中绝大多数类型都实现了Send,但也有些例外,如Rc<T>:因为如果将Rc<T>的拷贝值的所有权在多个线程中传递,Rust无法保证Rc<T>引用值的正确性。

实现Sync的类型表示该类型值可以在多个线程中被引用。也就是说,如果&T实现了Send,那么类型T就是Sync

SendSync markers其实就是将其他语言中的一些潜规则显式地标明出来,让编译器提前检查出代码中的隐患。

线程原语#

use std::thread;

fn main() {
    let v = vec![1, 2, 3];

    let handle = thread::spawn(move || {
        println!("Here's a vector: {:?}", v);
    });

    handle.join().unwrap();
}

thread::spawn新建一个线程,执行传递的闭包函数,返回一个JoinHandler,可以在主线程中调用join等待子线程结束。move用于强制闭包获取它使用的变量的所有权。

我们可以看看thread::spawn的实现:

#[stable(feature = "rust1", since = "1.0.0")]
pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where
    F: FnOnce() -> T,
    F: Send + 'static,
    T: Send + 'static,
{
    Builder::new().spawn(f).expect("failed to spawn thread")
}

spawn的入参和返回值都实现Send,同时其生命周期为'static。这是因为在多线程中,每个线程的执行周期不是同步的。父线程或者子线程的结束都会导致入参或者返回值的生命周期不满足Rust的约束条件。

Channel#

Rust实现了一个multi-producer, single-consumer的channel,用来在多线程间进行消息传递。

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
    });

    let received = rx.recv().unwrap();
    println!("Got: {}", received);
}

每个tx只能在一个线程中使用,如果需要多个生产者,需要用tx.clone()拷贝生产者。

    let (tx, rx) = mpsc::channel();

    let tx1 = tx.clone();
    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];

        for val in vals {
            tx1.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    thread::spawn(move || {
        let vals = vec![
            String::from("more"),
            String::from("messages"),
            String::from("for"),
            String::from("you"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    for received in rx {
        println!("Got: {}", received);
    }

但是,消费者端是不能被拷贝的,所以是一个mpsc的channel。

共享变量#

use std::sync::{Mutex, Arc};
use std::thread;

fn main() {
    let counter = Arc::new(Mutex::new(0));
    let mut handles = vec![];

    for _ in 0..10 {
        let counter = counter.clone();
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();

            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *counter.lock().unwrap());
}

ArcRc的多线程版本,让多个线程中的变量拥有同一个值的所有权。ArcRc一样,也只允许对值进行只读共享。如果需要对值进行修改,需要再包一层MutexRwLock或者Atomic类型。

Mutex也实现了SendSync

unsafe impl<R: RawMutex + Send, T: ?Sized + Send> Send for Mutex<R, T> {}
unsafe impl<R: RawMutex + Sync, T: ?Sized + Send> Sync for Mutex<R, T> {}