Rust async/await 内部是怎么实现的

同事问 Rust aysnc/await 是怎么实现的呢,在 await 的地方停住,之后又在继续的时候继续恢复(当前线程/coroutine)的执行,也是用了 yield/generator 这样的东西?

简单的试了下,猜测大概是这样吧。

如下代码:

1
2
3
4
5
6
7
8
9
10
async fn say_world() {
    println!("hello world");
}

#[tokio::main]
async fn main() {
    let op = say_world();

    op.await;
}

使用 nightly 的 rustc “编译”:

1
$ cargo rustc -- -Z unpretty=hir

下面是输出结果,这里只显示了 main() 函数相关处理后的代码(修改过格式后):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap().block_on(#[lang = "from_generator"](|mut _task_context|
{
    let op = say_world();
    match op
        {
        mut pinned => loop {
            match unsafe
                  {
                      #[lang = "poll"](#[lang = "new_unchecked"](&mut pinned),
                                       #[lang = "get_context"](_task_context))
                  }
                {
                    #[lang = "Ready"] {
                    0: result
                    } =>
                    break result,
                    #[lang = "Pending"] { } =>
                    {
                    }
                }

            _task_context = (yield());
        },
    };
}))

抛去那么多 attribute ,大概流程就是不挺的 loop ,查看 Future(这里的 op) 是否 ready。如果已经是 ready 的状态,那么就会对该结果进行处理,然后退出;否则(Pending的状态)就继续等待,让 runtime 调度其他 task 。

Future 在 tokio 里就“是”一个 task(确切说是 future.await?),tokio runtime 负责调度 task ,task 有些像 goroutine,不过 Rust 本身不自带 runtime 的实现。

根据这里对 await!的说明:

1
2
3
4
5
6
7
8
let mut future = IntoFuture::into_future($expression);
let mut pin = unsafe { Pin::new_unchecked(&mut future) };
loop {
    match Future::poll(Pin::borrow(&mut pin), &mut ctx) {
          Poll::Ready(item) => break item,
          Poll::Pending     => yield,
    }
}

以及这里

1
2
3
4
5
6
7
8
9
10
11
12
13
#[async]
fn print_lines() -> io::Result<()> {
    let addr = "127.0.0.1:8080".parse().unwrap();
    let tcp = await!(TcpStream::connect(&addr))?;
    let io = BufReader::new(tcp);

    #[async]
    for line in io.lines() {
        println!("{}", line);
    }

    Ok(())
}

上面代码经过“翻译”后,会类似这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
fn print_lines() -> impl Future<Item = (), Error = io::Error> {
    CoroutineToFuture(|| {
        let addr = "127.0.0.1:8080".parse().unwrap();
        let tcp = {
            let mut future = TcpStream::connect(&addr);
            loop {
                match future.poll() {
                    Ok(Async::Ready(e)) => break Ok(e),
                    Ok(Async::NotReady) => yield,
                    Err(e) => break Err(e),
                }
            }
        }?;

        let io = BufReader::new(tcp);

        let mut stream = io.lines();
        loop {
            let line = {
                match stream.poll()? {
                    Async::Ready(Some(e)) => e,
                    Async::Ready(None) => break,
                    Async::NotReady => {
                        yield;
                        continue
                    }
                }
            };
            println!("{}", line);
        }

        Ok(())
    })
}

Note: 上面代码 poll 结果还有 NotReady,应该是 RFC 更新不及时吧,最新版的 Future 应该都是 Pendding了。

从上面两处说明,我们也可以大概了解这种 generator 机制了:Ready 的时候返回结果,Pending 的时候让出调度。

今天只是大致搜了下资料,抛出了这样一个问题。下一步计划再确认下 tokio 的实现,看看它到底是怎么做的。