Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
【譯】Tokio 內部機制:從頭理解 Rust 非同步 I/O 框架

本文譯自 Tokio internals: Understanding Rust's asynchronous I/O framework from the bottom up
Thanks David Simmons for this awesome article!

Tokio 是 Rust 的開發框架,用於開發非同步 I/O 程式(asynchronous I/O,一種事件驅動的作法,可實現比傳統同步 I/O 更好的延伸性、效能與資源利用)。可惜的是,Tokio 過於精密的抽象設計,招致難以學習的惡名。即使我讀完教程後,依然不認為自己充分內化這些抽象層,以便推斷實際發生的事情。

從前的非同步 I/O 相關開發經驗甚至阻礙我學習 Tokio。我習慣使用作業系統提供的 selection 工具(例如 Linux epoll)當作起點,再轉移至 dispatch、state machine 等等。倘若直接從 Tokio 抽象層出發,卻沒有清楚了解 epoll_wait() 在何處及如何發生,我會覺得難以連結每個概念。Tokio 與 future-driven 的方法就好像一個黑盒子。

我決定不繼續由上而下的方法學習 Tokio,反其道而行,而是透過閱讀原始碼,確切理解具體實作是如何驅動從 epoll 事件到 Future::poll() 消耗 I/O 的整個過程。我不會深入高層次的 Tokio 與 futures 使用細節,現有的教程 有更完整詳細的內容。除了簡短的小結,我也不會探討一般性的非同步 I/O 問題,畢竟這些問題都可寫個獨立的主題了。我的目標是有信心讓 futures 與 Tokio 以我所認知的方式執行。

首先,有些重要的聲明。請注意,Tokio 正快速開發中,這裡所見所聞可能不久就會過時。這個研究中我用了 tokio-core 0.1.10futures- 0.1.17mio 0.6.10。由於我想從最底層理解 Tokio,我並不會考慮更高層次的套件如 tokio-prototokio-service。tokio-core 的事件系統本身有許多細節,為了精簡,我會盡量避開這些細項。我在 Linux 作業系統上研究 Tokio,而有些討論細節與作業系統相依,如 epoll。最後,這裡所有東西都是我這個 Tokio 新手的詮釋,可能會有錯誤或誤導。

Asynchronous I/O in a nutshell

同步 I/O 程式會執行阻塞性的 I/O 操作,直到操作完成。例如讀取會阻塞至資料抵達,寫入會阻塞線程直到欲傳遞的 bytes 送達 kernel。這些操作非常適合依序執行的傳統命令式程式設計。舉例來說,一個 HTTP 伺服器替每個新連線產生一個新線程,這個線程會讀取資料並阻塞線程直到接收完整的 request,之後處理請求,再來阻塞線程至資料完全寫入 response。這是個方法非常直觀,缺點是會阻塞線程,因此每個連線的線程要各自獨立,每個線程也需有自己的 stack。然而,線程開銷阻礙了伺服器處理大量連線的可延伸性(參閱 C10k problem),對低階系統來說也不易負荷。

如果 HTTP server 使用非同步 I/O 開發,換句話說,在同一個線程上處理所有 I/O 操作。如此一來,所有活躍的連線以及 socket 監聽都會配置為非阻塞狀態(non-blocking),並在 event loop 中監控讀取與寫入是否就緒,進而在事件發生時分派給對應的處理程式(handler)。而每個連線都需維護自身的狀態與 buffer,如果一個處理程式一次僅能從 200 bytes 的 request 中讀取 100 個位元組(bytes),它就不能等待剩下的 bytes 而造成線程阻塞,處理程式必須將部分資料儲存在 buffer 中,設定當前的狀態為「讀取請求中」,並返回給 event loop。待到下一次連線調用的相同的處理程式,它才可讀取剩餘的 bytes 並將狀態轉為「寫入回應中」。如此的資源管理系統將會非常迅速,但同時也產生更複雜的 state machine 與容易出錯的毛病。

理想中的非同步 I/O 框架應該要提供能寫出近似於同步 I/O 的程式,但底層是 event loop 與 state machine。這對每個語言來說都很不容易,不過 Tokio 的實現已接近了。

The Tokio stack

Tokio 的技術棧由下列幾個部分組成:

  1. The system selector。每個作業系統皆提供接收 I/O 事件的工具,如 epoll(linux)、kqueue()(FreeBSD/macOS),與 IOCP(Windows)。
  2. Mio - Metal I/OMio 是一個 Rust crate,提供低階通用的 I/O API,內部處理特定作業系統的 selector 實作細節,所以你不需再處理這件事。
  3. FuturesFutures 以強大的抽象來表示尚未發生的事物。這些 future 以許多好用的方式組合成另一新的複合 future 來代表一系列複雜的事件。這個抽象層足以通用於許多 I/O 之外的事件,但在 Tokio 中 ,我們專注在利用 futures 開發非同步 I/O state machines。
  4. Tokiotokio-core 提供一個中心的 event loop,這個 event loop 整合 Mio 回應 I/O 事件,並驅動 futures 完成(completion)。
  5. Your program。一個採用 Tokio 框架的程式,會以 futures 操作非同步 I/O,並將這些 futures 傳遞給 Tokio 的 event loop 來執行。

Mio: Metal I/O

Mio 旨在提供一系列低階的 I/O API,允許調用端接收事件,如 socket 讀寫就緒狀態(readiness state)改變等。重點如下:

  1. Poll 與 Evented。Mio 提供 Evented trait 來表示任何可當作事件來源的事物。在你的 event loop 中,你會利用 mio::Poll 物件註冊一定數量的 Evented,再調用 mio::Poll::poll 來阻塞 loop,直到一至多個 Evented 產生事件(或超時)。

  2. System selector。Mio 提供可跨平台的 system selector 訪問,所以 Linux epoll、Windows IOCP、FreeBSD/macOS kqueue(),甚至許多有潛力的平台都可調用相同的 API。不同平台使用 Mio API 的開銷不盡相同。由於 Mio 是提供基於 readiness(就緒狀態)的 API,與 Linux epoll 相似,不少 API 在 Linux 上都可以一對一映射。(例如:mio::Events 實質上是一個 struct epoll_event 陣列。)對比之下,Windows IOCP 是基於完成(completion-based)而非基於 readiness 的 API,所以兩者間會需要較多橋接。Mio 同時提供自身版本的 std::net struct 如 TcpListenerTcpStreamUdpSocket。這些 API 封裝 std::net 版本的 API,預設為非阻塞且提供 Evented 實作讓其將 socket 加入 system selector。

  3. Non-system events。Mio 除了提供從 I/O 所得的 readiness 狀態來源,也可以用來指示從 user-space 來的 readiness 事件(非系統事件)。舉例來說,當一個工作線程(worker thread)完成一單位的工作,它就可以向 event loop 發出完成信號。你的程式調用 Registration::new2() 以取得一個 (Registration, SetReadiness) 元組。Registration 是一個實作 Evented 且藉由 Mio 註冊在 event loop 的物件;而需要指示當前 readiness 狀態時,則會調用 SetReadiness::set_readiness。在 Linux 上,非系統事件通知以 pipe 實作,當調用 SetReadiness::set_readiness() 時,會將 0x01 這個位元組寫入 pipe 中。而 mio::Poll 底層的 epoll 會配置為監控 pipe 讀取結束,所以 epoll_wait() 會解除阻塞,而 Mio 就可以將事件傳遞到調用端。另外,無論註冊多少非系統事件,都只會在 Poll 實例化時建立唯一一個 pipe。

每個 Evented 的註冊皆與一個由調用端提供 usize 型別的 mio::Token 綁定,這個 token 將會與事件一起返回,以指示出對應的註冊資訊。這種作法很好地映射到 Linux 的 system selector,因為 token 可以放置在 64-bit 的 epoll_data union 中,並保持相同的功能。

這裡提供一個 Mio 操作的實際案例,下面是我們在 Linux 上使用 Mio 監控一個 UDP socket 的情況:

  1. 建立 socket

    let socket = mio::net::UdpSocket::bind(
        &SocketAddr::new(
            std::net::IpAddr::V4(std::net::Ipv4Addr::new(127,0,0,1)),
            2000
        )
    ).unwrap();

    建立一個 Linux UDP socket,其中封裝一個 std::net::UdpSocket,再封裝在 mio::net::UdpSocket 中。這個 socket 為非阻塞性(non-blocking)。

  2. 建立 poll 實例

    let poll = mio::Poll::new().unwrap();

    在這步驟,Mio 初始化 system selector、readiness 佇列(用於非系統事件),以及併發保護。當 readiness 佇列初始化時,會建立一個 pipe,讓 readiness 從 user-space 發出信號,而這個 pipe 的檔案描述符(file descriptor)會加入 epoll 中。每個 Poll 物件建立時,都會賦予一個獨特、遞增的 selector_id

  3. 透過 poll 註冊 socket

    poll.register(
        &socket,
        mio::Token(0),
        mio::Ready::readable(),
        mio::PollOpt::level()
    ).unwrap();

    UdpSocketEvented::register() 被調用時,會將代理指向一個封裝的 EventedFd,這個 EventedFd 會將 socket 的 file descriptor 加入 poll selector 中(最終會調用 epoll_ctl(fepd, EPOLL_CTL_ADD, fd, &epoll_event),而 epoll_event.data 設置為傳入的 token 值)。當一個 UdpSocket 註冊後,selector_id 會設置到與傳入的 Poll 相同,從而與 selector 產生連結。

  4. 在 event loop 中呼叫 poll()

    loop {
        poll.poll(&mut events, None).unwrap();
        for event in &events {
            handle_event(event);
        }
    }

    System selector(epoll_wait())與 readiness 佇列將會輪詢(poll)新的事件。(epoll_wait() 會阻塞,但由於非系統事件是透過 pipe 出發 epoll,事件仍會即時處理。)這一系列組合的事件可供調用端處理。

Futures and Tasks

Futures 是從函數式程式設計借來的技術,一個尚未完成的運算會以一個 future 代表,而這些獨立的 future 可以組合起來,開發更複雜的系統。這個概念對非同步 I/O 非常中用,因為在處理交易(transaction)的所有基礎步驟,都可以模化為合成 futures(combinded futures)。以 HTTP 伺服器為例,一個 future 讀取 request,會從接收到有效資料開始讀取到 request 結束,另一個 future 則會處理這個 request 並產生 response,再另一個 future 則會寫入 responses。

在 Rust 中,futures crate 實現了 futures。你可以透過實作 Future trait 來定義自己的 future,這個 trait 需實現 poll() 方法,這個方法會在需要時調用,允許 future 開始執行。poll() 方法會回傳一個錯誤(error),或回傳一個指示告知 future 仍在處理,或是當 future 完成時返回一個值。Future trait 也提供許多組合操作子(combinator)作為預設方法。

欲理解 futures,須先探討三個重要的概念:tasksexecutors,以及 notifications,且需理解此三者該如何安排,才能在正確時間點調用 future 的 poll() 方法。每一個 future 都在一個 task 語彙環境(context)中執行。一個 task 只與一個 future 關聯,而這個 future 卻可能是一個合成的 future,驅動其他封裝的 futures。(舉例來說,多個 future 用 join_all() 組合操作子,串連成單一一個 future,或是兩個 future 利用 and_then() 組合操作子來依序執行。)

Task 與它的 futures 需要被一個 executor 執行。一個 executor 的責任是在正確時間點輪詢 task/future,輪詢通常會在接收到執行進度開始的通知時。而這個通知將在一個實作 futures::executor::Notify trait 的物件調用 notify 時發布。這裡有個例子,是由 futures crate 所提供的非常簡單的 executor,在調用 future 上的 wait() 被呼叫。擷自原始碼

/// Waits for the internal future to complete, blocking this thread's
/// execution until it does.
///
/// This function will call `poll_future` in a loop, waiting for the future
/// to complete. When a future cannot make progress it will use
/// `thread::park` to block the current thread.
pub fn wait_future(&mut self) -> Result<F::Item, F::Error> {
    ThreadNotify::with_current(|notify| {

        loop {
            match self.poll_future_notify(notify, 0)? {
                Async::NotReady => notify.park(),
                Async::Ready(e) => return Ok(e),
            }
        }
    })
}

給定一個融合 task 與 future 的 futures::executor::Spawn 物件,這個 executor 在迴圈中調用 poll_future_notify。這個 Notify 會成為 task 執行語彙環境的一部分,future 也會被輪詢。如果一個 future poll 方法回傳 Async::NotReady,表示 future 仍等待中,必須在往後再次輪詢。Notify object 會從 futures::task::current() 取得一個指向 task 的 handle,且在 future 有些進展時調用 notify() 方法。(當一個 future 被輪詢時,與該 future 相關的 task 訊息將會儲存到 thread-local 中,thread-local 可以透過 current() 存取取得。)上例中,如果輪詢回傳 Async::NotReady,executor 會阻塞至接收到通知。也許 future 在其他線程運算,在完成時調用 notify();或是 poll() 方法在返回 Asynx::NotReady 之前,自身直接調用了 notify()(後者並不常見,因為理論上一個 poll() 在返回之前應該持續取得進展)。

Tokio 的 event loop 行為上比簡單整合「 Mio 事件驅動 future 完成」來得精細。舉例來說,一個 Mio event 表示一個 socket 的 readiness(就緒狀態),最後會產生一個通知,足以告知相對應的 future 需要輪詢。

處理 future 時,Task 是最基礎的執行單元,且基本上就是綠色線程,提供協調式多工,允許在同一個系統線程有多個執行語彙環境。當一個 task 無法有所進展,會讓處理器先處理其他可執行的 task。我們必須理解的是,「通知」會發生在 task 層級而非 future 層級。當一個 task 被通知時,它會輪詢它連結的最高層級的 future,這會導致任何或是全部的 child future 同樣被輪詢。例如,如果一個 task 最高層級的 future 是一個以 join_all 組合的十個 future,而其中一個 future 安排要通知此一 task,則無論需不需要,全部十個 future 皆須接受輪詢。

Tokio's interface with Mio

Tokio 利用上述的 Mio 「非系統事件」,將 task 通知轉換為 Mio 的事件。在取得一個 Mio 的 (RegistrationSetReadiness)元組後,Tokio 會將 Registration(一個 Evented)註冊至 Mio 的 poll (event loop)中,再將 SetReadiness 封裝在實作了 Notify trait 的 MySetReadiness 中。原始碼如下:

struct MySetReadiness(mio::SetReadiness);

impl Notify for MySetReadiness {
    fn notify(&self, _id: usize) {
        self.0.set_readiness(mio::Ready::readable())
              .expect("failed to set readiness");
    }
}

在這個作法中,task 的通知將轉換為 Mio 事件,且可以透過 Tokio 的事件處理與分派機制與其他 Mio 事件作伙處理。

如同 Mio 封裝 std::net 內的 UdpSocketTcpListener,以及 TcpStream 來客製化需求,Tokio 也利用了組合(composition)與裝飾(decoration)建立這些型別的 Tokio 版。舉例來說,Tokio 的 UdpSocket 架構大致如下:

Tokio 版本的 I/O 來源型別的建構子都需要傳入 event loop 的 handle(tokio_core::reactor::Handle)。當實例化時,這些型別會將它們的 socket 註冊至 Mio poll 的 event loop 上,以利接收 edge-triggred(譯注:一種 epoll event 的觸發模式)的事件及其新賦予的偶數數字 token(以下會解釋)。當底層的 I/O 操作回傳 WouldBlock 時,這些型別可以很方便地安排當前的 task 來接收讀寫的 readiness。

Tokio 在 Mio 上註冊了許多 Evented 型別,儲存在特定的 token 上:

  • Token 0(TOKEN_MESSAGES:用於 Tokio 內部的消息佇列(message queue),這個佇列提供移除 I/O 來源、接收讀寫 readiness 通知的 task 排程,設定 timeout,以及執行在 event loop 語彙環境中的任意閉包。這個 token 可以安全地從其他線程與 event loop 溝通。例如,Remote::spawn() 透過訊息系統,將 future 送達 event loop。

    實作上,消息佇列是一個 futures::sync::mpsc stream。身為一個 futures::stream::Stream(與 future 類似,但是產生一序列的值而非單一值),消息佇列使用上述 MySetReadiness 方案來處理,而 Registration 則是以 TOKEN_MESSAGES 這個 token 註冊。當接收到 TOKEN_MESSAGES 事件時,該事件會分派到 consume_queue() 方法進一步處理。(原始碼:enum Messageconsume_queue()

  • Token 1(TOKEN_FUTURE:用於通知 Tokio 需要輪詢 main task。這個 token 會在與 main task 相關聯的通知上(也就是傳入 Core::run() 的 future 或它的子 future,而非透過 spawn() 在不同 task 中執行的 future)。這個事件同樣用了 MySetReadiness 方案將 future 轉譯成 Mio 的事件。在一個 future 被 main task 執行前,會先回傳 Async::NotReady,並以其所選的方式在稍後發布通知。當接收了 TOKEN_FUTURE 事件,Tokio event loop 就會再次輪詢 main task。

  • 大於 1 的偶數 token(TOKEN_START + key * 2:用來指示 I/O 來源的 readiness 改變。Token 中的 key 是 Slab key,關聯值是 Core::inner::io_dispatch Slab<ScheduledIo>。當 Mio 的 I/O 來源型別(UdpSocketTcpListenerTcpStream)實例化之初,會自動以此 token 註冊。

  • 大於 1 的奇數 token(TOKEN_START + key * 2 + 1:用來指示一個 spawned task(及其關聯的 future)需要被輪詢。Token 中的 key 是 Slab key,關聯值是 Core::inner::task_dispatch Slab<ScheduledTask>。和 TOKEN_MESSAGESTOKEN_FUTURE 事件相同,這個事件也用了 MySetReadiness 溝通。

Tokio event loop

Tokio,更精確來說是 tokio_core::reactor::Core 提供了 event loop 來管理 futures 和 tasks,驅動 future 完成,以及與 Mio 介接的介面,讓 I/O 事件可正確通知對應的 task。使用 event loop 需透過 Core::new() 實例化一個 Core,並調用 Core::run() 傳入一個 future。這個 event loop 在返回之前,將會驅動傳入的 future 至完成。以伺服器程式來說(serve application),這個 future 很可能生命週期較長,例如使用 TcpListener 持續接收新傳入的連結,每個連結透過 Handle.spawn() 分別建立 task,由自身的 future 獨立處理。

以下的流程圖大略點出 Tokio event loop 的基本輪廓:

What happens when data arrives on a socket?

想了解 Tokio,可以觀察當資料抵達 socket 時,event loop 發生的每個步驟。我很訝異地發現,這個過程最終分為兩部分,分別在 event loop 內各自的迭代中,進行各自的 epoll 交易處理。第一部分負責當 socket 讀取就緒時(例如,Mio 事件帶著比 1 大的偶數 token,或 main task 的 TOKEN_FUTURE),傳送通知到對該 socket 有興趣的 task;第二部分則是透過輪詢 task 與它的 future 來處理通知(例如,Mio 事件帶著比 1 大的奇數 token)。我們來了解以下情境:一個 spawned task 從 Linux 上的 UdpSocket,透過 Tokio event loop 讀取資料,並假設前一次輪詢結果導致 recv_from() 回傳一個 WouldBlock 錯誤。

Tokio event loop 調用 mio::Poll:poll(),該方法轉而調用 epoll_wait()(在 Linux 上)進而阻塞到某個監測中的 file descriptor 發生了 readiness 改變的事件。當上述情形發生後,epoll_wait() 回傳一個 epoll_event structs 的陣列,用以描述發生什麼事,這些 structs 也將透過 Mio 轉譯為 mio::Events,並返回 Tokio。(在 Linux 上,這些轉譯應該是零成本(zero-cost),因為 mio::Events 就只是簡單,以一個 epoll_event 陣列組成的元組結構(tuple struct)。)在我們的例子,假設在陣列中只有一個事件指出 socket 已讀取就緒。由於該事件的 token 是大於 1 的偶數,Tokio 辨識其為 I/O 事件,並從 Slab<ScheduledIo> 中尋找對應的元素,以取得有哪些 task 對這個 socket 的讀寫 readiness 狀態有興趣。接下來,Tokio 會通知對讀取有興趣的 task,這些 task 透過前述的 MySetReadiness,調用 Mio 的 set_readiness()。Mio 會將這個非系統的事件詳細資訊加到 readiness 佇列中,並寫入 0x01 到 readiness pipe 中。

在 Tokio event loop 往下一個迭代前進之前,它會再次輪詢 Mio,Mio 則調用 epoll_wait(),而 epoll_wait() 這次返回一個在 Mio 的 readiness pipe 上發生的讀取 readiness 事件。Mio 讀取之前寫入的 0x01,並從 readiness 佇列取出最前端(dequeue)的非系統事件資料,並將這個事件回傳到 Tokio。由於該事件的 token 是大於 1 的奇數 token,Tokio 辨識其為 task 通知事件,並從 Slab<ScheduledTask> 中尋找對應的元素,以取得 task 從 spawn() 回傳的最原始的 Spawn 物件。接下來,Tokio 透過 poll_future_notify() 輪詢這個 task 與它的 future,這個 future 可能會從 socket 讀取資料,直至得到 WouldBlock 錯誤。

這個二迭代的方法涉及了 pipe 讀寫,對比其他非同步 I/O event loop,可能會有一點額外開銷。如果在一個單線程的程式中,使用 strace 會看到一個線程用 pipe 與自己溝通,很是奇怪:

pipe2([4, 5], O_NONBLOCK|O_CLOEXEC) = 0
...
epoll_wait(3, [{EPOLLIN|EPOLLOUT, {u32=14, u64=14}}], 1024, -1) = 1
write(5, "\1", 1) = 1
epoll_wait(3, [{EPOLLIN, {u32=4294967295, u64=18446744073709551615}}], 1024, 0) = 1
read(4, "\1", 128) = 1
read(4, 0x7ffce1140f58, 128) = -1 EAGAIN (Resource temporarily unavailable)
recvfrom(12, "hello\n", 1024, 0, {sa_family=AF_INET, sin_port=htons(43106), sin_addr=inet_addr("127.0.0.1")}, [16]) = 6
recvfrom(12, 0x7f576621c800, 1024, 0, 0x7ffce1140070, 0x7ffce114011c) = -1 EAGAIN (Resource temporarily unavailable)
epoll_wait(3, [], 1024, 0) = 0
epoll_wait(3, 0x7f5765b24000, 1024, -1) = -1 EINTR (Interrupted system call)

Mio 選用 pipe 的方案來支持通用性,以防 set_readiness() 可能被其他線程調用。也有可能這種作法對強制實施公平的事件調節與維持 futures 與 I/O 的間接層有所幫助。

Lessons learned: Combining futures vs. spawning futures

最初探索 Tokio 時,我寫了一個小程式,負責監聽不同 UDP socket 進來的資料。這個程式建立十個讀取 socket 的 future 實例,每個都監聽不同的埠口(port)。我天真地使用 join_all() 將所有 future 合成為單一 future,並將之傳入 Core::run(),訝異的是,我發現每當一個封包送達,所有 future 都會輪詢一次。另一個驚訝的點是,tokio_core::net:UdpSocket::recv_from()(以及底層的 PollEvented)非常聰明,當 socket 在前一次的 Mio 輪詢中尚未標記為讀取就緒時,會避免調用作業系統 rectfrom()。以下的 strace 反映出我寫的 future poll() 的除錯 println!(),大致如下:

epoll_wait(3, [{EPOLLIN|EPOLLOUT, {u32=14, u64=14}}], 1024, -1) = 1
write(5, "\1", 1) = 1
epoll_wait(3, [{EPOLLIN, {u32=4294967295, u64=18446744073709551615}}], 1024, 0) = 1
read(4, "\1", 128) = 1
read(4, 0x7ffc183129d8, 128) = -1 EAGAIN (Resource temporarily unavailable)
write(1, "UdpServer::poll()...\n", 21) = 21
write(1, "UdpServer::poll()...\n", 21) = 21
write(1, "UdpServer::poll()...\n", 21) = 21
write(1, "UdpServer::poll()...\n", 21) = 21
write(1, "UdpServer::poll()...\n", 21) = 21
write(1, "UdpServer::poll()...\n", 21) = 21
write(1, "UdpServer::poll()...\n", 21) = 21
recvfrom(12, "hello\n", 1024, 0, {sa_family=AF_INET, sin_port=htons(43106), sin_addr=inet_addr("127.0.0.1")}, [16]) = 6
getsockname(12, {sa_family=AF_INET, sin_port=htons(2006), sin_addr=inet_addr("127.0.0.1")}, [16]) = 0
write(1, "recv 6 bytes from 127.0.0.1:43106 at 127.0.0.1:2006\n", 52) = 52
recvfrom(12, 0x7f2a11c1c400, 1024, 0, 0x7ffc18312ba0, 0x7ffc18312c4c) = -1 EAGAIN (Resource temporarily unavailable)
write(1, "UdpServer::poll()...\n", 21) = 21
write(1, "UdpServer::poll()...\n", 21) = 21
write(1, "UdpServer::poll()...\n", 21) = 21
epoll_wait(3, [], 1024, 0) = 0
epoll_wait(3, 0x7f2a11c36000, 1024, -1) = ...

有鑑於 Tokio 與 futures 的具體內部運作某個程度上對我來說有點隱晦,我想我希望背後有些魔法路由,可以只輪詢必要的 futures。當然,對 Tokio 有更深入的理解後,我的程式很明顯這樣利用 futures:

這的確可以執行,但不夠好,尤其是當你有一拖拉庫 socket 時。由於通知在 task 層級發生,上圖中任意一個綠色方格中通知都會導致 main task 被通知。它將會輪詢 FromAll future 使得所有 FromAll 的 child future 都須接受輪詢。我真正需要的是一個簡單的 main future,使用 Handle::spawn() 來啟動每個封裝在各自的 task 中的 future。這種安排大致如下圖:

當任何 future 安排一個通知,只有該 future 的 task 會收到通知,也只有該 future 會被輪詢(回想一下,「安排一個通知」會自動發生在 tokio_core::net:UdpSocket::rect_from()mio::net::UdpSocket::rect_from() 回傳值中接收到 WouldBlock )。future 組合操作子敘述表達能力強勁,可好整以暇地描述協議(protocol)的流程而不須弄髒手寫手動輪詢的狀態機。然而重要的是,你必須理解你的設計也許需要支援各自獨立,獨自且並行運作的 tasks。(譯注:而非都在 main task 上使用 join_all()

Final thoughts

閱讀 Tokio、Mio 以及 futures 原始碼後,大大幫助我鞏固對 Tokio 的理解,也驗證了透過理解具體實作來釐清抽象層的學習策略。這個方法在僅僅學習抽象層的狹隘使用案例時非常危險,我們必須意識到具體的示例僅是助於理解一般通例。在閱讀完原始碼之後,再次閱讀 Tokio 的教學文件,我有些馬後炮的意見:Tokio 非常合理,應該要很容易理解與上手!

我仍有些問題待日後研究:

  • Tokio 有處理 edge triggering(Linux epoll)的飢餓問題(starvation problem)嗎?我認為這個問題可以在 future 中,以單一一個 poll() 限制讀 / 寫的數量。當達到這個限制時,future 可以在顯式通知當前 task 提前返回,而非依靠 Tokio I/O 來源類型的隱式「WouldBlock 排程」行為。因此這使得其他 task 與 future 有機會有所進展。
  • Tokio 是否不依賴於將工作卸載給工作線程(worker thread)以最大化處理器核心運用,而是直接支援多線程環境下執行 event loop 嗎?

2017-12-19 更新:這裡有 Reddit 對話串討論本文。Mio 的作者 Carl Lerche 在這裡這裡貼了些資訊量充足的留言。除了回應上述問題,他也點出 FuturesUnordered 是一種合成 futures 的方法,只有相關的 child future 會被輪詢,以避免所有 future 像使用 join_all() 全部輪詢,不過這方法有些額外的記憶體配置開銷要衡量。另外,未來的 Tokio 將要遷離 mio::Registration 的通知方案,目的是簡化前述一些步驟。

2017-12-21 更新:看起來 Hacker News 也有在討論這篇文章

@bsed

This comment has been minimized.

Copy link

bsed commented Jul 9, 2019

can i reprint it?

@weihanglo

This comment has been minimized.

Copy link
Owner Author

weihanglo commented Jul 11, 2019

@bsed Ok. Just follow Creative commons license.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.