--- url: https://tokio.rs/ description: Tokio 是 Rust 生态系统中最流行的异步运行时,提供事件驱动、非阻塞的 I/O 平台。它不是 Web 框架,而是异步编程的基础设施,类似于 Node.js 的事件循环或 Go 的 runtime。 --- # Tokio 完整文档 ## 简介 Tokio 是 Rust 异步编程的核心运行时,提供: - **异步任务调度器** - 高效的工作窃取调度 - **异步 I/O** - 非阻塞的 TCP/UDP/文件操作 - **定时器** - 延迟、超时、周期定时器 - **同步原语** - 异步 Mutex、Channel、Semaphore 等 ### 版本要求 - Rust 版本 >= 1.70 - 当前最新版本:1.x ### 快速安装 ```toml [dependencies] tokio = { version = "1", features = ["full"] } ``` ### 特性标志 | 特性 | 说明 | |------|------| | `rt` | 运行时核心 | | `rt-multi-thread` | 多线程运行时 | | `io-util` | I/O 工具方法 | | `net` | 网络功能(TCP/UDP) | | `time` | 定时器功能 | | `sync` | 同步原语 | | `fs` | 文件系统操作 | | `macros` | `#[tokio::main]` 等宏 | | `full` | 启用所有特性 | --- # 第一部分:Runtime 核心架构 ## 1.1 Runtime 类型 Tokio 提供两种运行时模式: ### Current-Thread Runtime(单线程运行时) ```rust use tokio::runtime::Builder; let rt = Builder::new_current_thread() .enable_all() .build() .unwrap(); rt.block_on(async { println!("Running on current thread"); }); ``` **适用场景:** - 轻量级应用 - 需要支持 `!Send` 类型 - 测试环境 ### Multi-Thread Runtime(多线程运行时) ```rust use tokio::runtime::Builder; let rt = Builder::new_multi_thread() .worker_threads(4) .enable_all() .build() .unwrap(); rt.block_on(async { tokio::spawn(async { println!("Running on worker thread"); }).await.unwrap(); }); ``` **适用场景:** - 生产环境(推荐) - 需要真正的并行执行 ### 对比表 | 特性 | current_thread | multi_thread | |------|----------------|--------------| | 线程数 | 1 | 可配置(默认=CPU核心数) | | 任务并行 | 否(并发) | 是(真正并行) | | 支持 `!Send` | 配合 LocalSet | 否 | | 工作窃取 | 否 | 是 | --- ## 1.2 #[tokio::main] 宏 将 async main 函数转换为同步函数: ```rust // 使用宏 #[tokio::main] async fn main() { println!("Hello world"); } // 等效于 fn main() { tokio::runtime::Builder::new_multi_thread() .enable_all() .build() .unwrap() .block_on(async { println!("Hello world"); }) } ``` ### 配置选项 ```rust // 单线程运行时 #[tokio::main(flavor = "current_thread")] async fn main() {} // 多线程,指定 worker 数量 #[tokio::main(flavor = "multi_thread", worker_threads = 2)] async fn main() {} // 测试用:启动时暂停时间 #[tokio::main(start_paused = true)] async fn main() {} ``` --- ## 1.3 Runtime Builder 配置 ```rust use tokio::runtime::Builder; use std::time::Duration; let runtime = Builder::new_multi_thread() // 线程配置 .worker_threads(4) // worker 线程数 .max_blocking_threads(512) // 阻塞线程池最大数量 .thread_name("my-worker") // 线程名称 .thread_stack_size(3 * 1024 * 1024) // 线程栈大小 // 资源驱动 .enable_io() // 启用 I/O 驱动 .enable_time() // 启用时间驱动 .enable_all() // 启用所有 // 回调钩子 .on_thread_start(|| println!("Thread started")) .on_thread_stop(|| println!("Thread stopped")) .build() .unwrap(); ``` --- ## 1.4 block_on 与 spawn 的区别 | 特性 | `block_on` | `spawn` | |------|------------|---------| | 调用位置 | 同步上下文 | 异步上下文 | | 阻塞行为 | 阻塞当前线程 | 立即返回 JoinHandle | | 返回值 | Future 的输出值 | `JoinHandle` | | 用途 | 启动异步程序 | 并发执行子任务 | ```rust use tokio::runtime::Runtime; fn main() { let rt = Runtime::new().unwrap(); // block_on: 同步世界到异步世界的桥梁 let result = rt.block_on(async { // spawn: 在运行时中并发执行任务 let handle = tokio::spawn(async { 42 }); handle.await.unwrap() }); println!("Result: {}", result); } ``` --- ## 1.5 工作窃取调度 多线程运行时使用工作窃取算法实现负载均衡: ``` Worker 0 Worker 1 (空闲) Worker 2 ┌──────┐ ┌──────┐ ┌──────┐ │Task A│ │ │ │Task D│ │Task B│ ◄───窃取───│ IDLE │───窃取───► │Task E│ │Task C│ │ │ │Task F│ └──────┘ └──────┘ └──────┘ ``` **队列结构:** - LIFO Slot: 优化任务局部性 - Local Queue: 每个 worker 256 个槽位 - Global Queue: 存放溢出任务和外部 spawn 的任务 --- # 第二部分:Task 任务系统 ## 2.1 tokio::spawn() 创建新的异步任务,立即开始执行: ```rust #[tokio::main] async fn main() { let handle = tokio::spawn(async { println!("Task running"); 42 }); let result = handle.await.unwrap(); println!("Result: {}", result); } ``` ### 使用 move 捕获变量 ```rust use tokio::net::TcpStream; async fn handle_connection(socket: TcpStream) { // 使用 async move 转移所有权 tokio::spawn(async move { // socket 的所有权已转移到此任务 process(socket).await; }); } ``` ### 'static 生命周期约束 spawn 要求 Future 是 `'static` 的: ```rust use std::sync::Arc; async fn example() { let data = Arc::new(vec![1, 2, 3]); for i in 0..3 { let data_clone = Arc::clone(&data); tokio::spawn(async move { println!("Task {}: {:?}", i, data_clone); }); } } ``` --- ## 2.2 JoinHandle `JoinHandle` 用于等待任务完成和获取结果: ```rust use tokio::task::JoinHandle; #[tokio::main] async fn main() { let handle: JoinHandle = tokio::spawn(async { 5 + 3 }); match handle.await { Ok(value) => println!("Task returned: {}", value), Err(e) if e.is_panic() => println!("Task panicked!"), Err(e) if e.is_cancelled() => println!("Task cancelled"), Err(e) => println!("Task failed: {:?}", e), } } ``` --- ## 2.3 JoinSet 批量任务管理 管理一组任务,按完成顺序获取结果: ```rust use tokio::task::JoinSet; #[tokio::main] async fn main() { let mut set = JoinSet::new(); for i in 0..10 { set.spawn(async move { i }); } while let Some(res) = set.join_next().await { println!("Task completed: {}", res.unwrap()); } } ``` --- ## 2.4 tokio::select! 宏 同时等待多个异步操作,返回第一个完成的: ```rust use tokio::time::{sleep, Duration}; #[tokio::main] async fn main() { tokio::select! { _ = sleep(Duration::from_millis(100)) => { println!("Sleep completed first"); } result = async_operation() => { println!("Operation completed: {}", result); } } } async fn async_operation() -> i32 { sleep(Duration::from_millis(50)).await; 42 } ``` ### 带条件的分支 ```rust use tokio::sync::mpsc; #[tokio::main] async fn main() { let (tx, mut rx) = mpsc::channel::(10); let enable_rx = true; tokio::select! { Some(val) = rx.recv(), if enable_rx => { println!("Received: {}", val); } else => { println!("All branches disabled"); } } } ``` ### biased 模式 强制按声明顺序 poll: ```rust tokio::select! { biased; _ = shutdown_signal() => { println!("Shutdown first priority"); break; } msg = rx.recv() => { // 处理消息 } } ``` --- ## 2.5 任务取消 ### 使用 abort() ```rust let handle = tokio::spawn(async { tokio::time::sleep(Duration::from_secs(10)).await; }); handle.abort(); match handle.await { Err(e) if e.is_cancelled() => println!("Task cancelled"), _ => {} } ``` ### 使用 CancellationToken (tokio-util) ```rust use tokio_util::sync::CancellationToken; let token = CancellationToken::new(); let token_clone = token.clone(); let task = tokio::spawn(async move { tokio::select! { _ = token_clone.cancelled() => { println!("Graceful shutdown"); } _ = long_running_task() => {} } }); // 发送取消信号 token.cancel(); ``` --- ## 2.6 spawn_blocking 在专用线程池中执行阻塞操作: ```rust use tokio::task; #[tokio::main] async fn main() { // CPU 密集型计算 let result = task::spawn_blocking(|| { let mut sum = 0u64; for i in 0..10_000_000 { sum += i; } sum }).await.unwrap(); println!("Result: {}", result); } ``` **spawn vs spawn_blocking:** | 特性 | `spawn` | `spawn_blocking` | |------|---------|------------------| | 执行位置 | Worker 线程 | 阻塞线程池 | | 适用场景 | 异步 I/O | CPU 密集/同步阻塞 | | 能否取消 | 可以 | 开始后不能 | | 闭包类型 | `async` 块 | 普通闭包 | --- ## 2.7 task_local! 任务本地存储 ```rust tokio::task_local! { static REQUEST_ID: u32; } #[tokio::main] async fn main() { REQUEST_ID.scope(42, async { println!("Request ID: {}", REQUEST_ID.get()); }).await; } ``` --- # 第三部分:异步 I/O 模块 ## 3.1 AsyncRead 和 AsyncWrite 核心异步 I/O trait: ```rust pub trait AsyncRead { fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll>; } pub trait AsyncWrite { fn poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll>; fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>; fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>; } ``` --- ## 3.2 AsyncReadExt 和 AsyncWriteExt 扩展方法提供便捷的异步 I/O 操作: ### 读取方法 ```rust use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::TcpStream; async fn read_examples() -> std::io::Result<()> { let mut stream = TcpStream::connect("127.0.0.1:8080").await?; // read - 读取部分数据 let mut buf = [0u8; 1024]; let n = stream.read(&mut buf).await?; // read_exact - 精确读取指定字节 let mut exact_buf = [0u8; 4]; stream.read_exact(&mut exact_buf).await?; // read_to_end - 读取所有数据 let mut all_data = Vec::new(); stream.read_to_end(&mut all_data).await?; // read_to_string - 读取为字符串 let mut content = String::new(); stream.read_to_string(&mut content).await?; // 读取数值(大端序) let value = stream.read_u32().await?; // 读取数值(小端序) let value_le = stream.read_u32_le().await?; Ok(()) } ``` ### 写入方法 ```rust async fn write_examples() -> std::io::Result<()> { let mut stream = TcpStream::connect("127.0.0.1:8080").await?; // write - 写入部分数据 let n = stream.write(b"Hello").await?; // write_all - 写入所有数据 stream.write_all(b"Complete message").await?; // 写入数值 stream.write_u32(0xDEADBEEF).await?; // flush - 刷新缓冲区 stream.flush().await?; // shutdown - 关闭写入端 stream.shutdown().await?; Ok(()) } ``` --- ## 3.3 BufReader 和 BufWriter 缓冲 I/O 减少系统调用: ```rust use tokio::io::{AsyncBufReadExt, BufReader}; use tokio::fs::File; async fn buffered_read() -> std::io::Result<()> { let file = File::open("data.txt").await?; let reader = BufReader::new(file); let mut lines = reader.lines(); while let Some(line) = lines.next_line().await? { println!("Line: {}", line); } Ok(()) } ``` ```rust use tokio::io::{AsyncWriteExt, BufWriter}; use tokio::fs::File; async fn buffered_write() -> std::io::Result<()> { let file = File::create("output.txt").await?; let mut writer = BufWriter::new(file); writer.write_all(b"Line 1\n").await?; writer.write_all(b"Line 2\n").await?; writer.flush().await?; // 必须调用 flush Ok(()) } ``` --- ## 3.4 copy 和 copy_bidirectional ### 单向复制 ```rust use tokio::io; use tokio::fs::File; use tokio::net::TcpStream; async fn copy_file_to_network() -> io::Result { let mut file = File::open("data.txt").await?; let mut stream = TcpStream::connect("127.0.0.1:8080").await?; io::copy(&mut file, &mut stream).await } ``` ### 双向复制(代理模式) ```rust use tokio::io::copy_bidirectional; use tokio::net::TcpStream; async fn proxy(client: &mut TcpStream, server: &mut TcpStream) -> io::Result<(u64, u64)> { // 返回 (client->server 字节数, server->client 字节数) copy_bidirectional(client, server).await } ``` --- ## 3.5 tokio::fs 文件操作 ```rust use tokio::fs; async fn file_operations() -> std::io::Result<()> { // 读写文件 let content = fs::read_to_string("config.json").await?; fs::write("output.txt", b"Content").await?; // 目录操作 fs::create_dir_all("path/to/dir").await?; // 文件操作 fs::copy("src.txt", "dst.txt").await?; fs::rename("old.txt", "new.txt").await?; fs::remove_file("file.txt").await?; // 遍历目录 let mut entries = fs::read_dir(".").await?; while let Some(entry) = entries.next_entry().await? { println!("{}", entry.path().display()); } Ok(()) } ``` --- # 第四部分:同步原语 ## 4.1 tokio::sync::Mutex 异步互斥锁,跨 await 点安全: ```rust use tokio::sync::Mutex; use std::sync::Arc; #[tokio::main] async fn main() { let data = Arc::new(Mutex::new(0)); let mut handles = vec![]; for _ in 0..10 { let data = Arc::clone(&data); handles.push(tokio::spawn(async move { let mut lock = data.lock().await; *lock += 1; })); } for handle in handles { handle.await.unwrap(); } println!("Result: {}", *data.lock().await); } ``` ### Mutex 方法 | 方法 | 说明 | |------|------| | `lock().await` | 异步获取锁 | | `try_lock()` | 非阻塞尝试获取 | | `lock_owned().await` | 获取带所有权的锁 | | `blocking_lock()` | 同步阻塞获取(阻塞线程池中使用) | ### tokio::sync::Mutex vs std::sync::Mutex | 特性 | tokio::sync::Mutex | std::sync::Mutex | |------|-------------------|------------------| | 阻塞方式 | 异步等待 | 阻塞线程 | | 跨 await | 安全 | 不安全 | | 性能 | 较低 | 较高 | | 公平性 | 公平(FIFO) | 不保证 | **选择建议:** - 锁持有时间长或跨 await → `tokio::sync::Mutex` - 锁持有时间短且不跨 await → `std::sync::Mutex` --- ## 4.2 RwLock 读写锁 ```rust use tokio::sync::RwLock; use std::sync::Arc; #[tokio::main] async fn main() { let data = Arc::new(RwLock::new(vec![1, 2, 3])); // 多个读者可以并发 let readers: Vec<_> = (0..5).map(|i| { let data = Arc::clone(&data); tokio::spawn(async move { let guard = data.read().await; println!("Reader {}: {:?}", i, *guard); }) }).collect(); // 写者独占 { let mut guard = data.write().await; guard.push(4); } for r in readers { r.await.unwrap(); } } ``` --- ## 4.3 Semaphore 信号量 限制并发访问数量: ```rust use tokio::sync::Semaphore; use std::sync::Arc; #[tokio::main] async fn main() { let semaphore = Arc::new(Semaphore::new(3)); // 最多 3 个并发 let mut handles = vec![]; for i in 0..10 { let permit = semaphore.clone().acquire_owned().await.unwrap(); handles.push(tokio::spawn(async move { println!("Task {} running", i); tokio::time::sleep(Duration::from_millis(100)).await; drop(permit); // 释放许可 })); } for handle in handles { handle.await.unwrap(); } } ``` --- ## 4.4 Notify 通知 等待和发送通知: ```rust use tokio::sync::Notify; use std::sync::Arc; #[tokio::main] async fn main() { let notify = Arc::new(Notify::new()); let notify_clone = notify.clone(); tokio::spawn(async move { tokio::time::sleep(Duration::from_millis(100)).await; notify_clone.notify_one(); // 唤醒一个等待者 }); notify.notified().await; println!("Notified!"); } ``` --- ## 4.5 Channel 通道 ### mpsc - 多生产者单消费者 ```rust use tokio::sync::mpsc; #[tokio::main] async fn main() { let (tx, mut rx) = mpsc::channel::(32); tokio::spawn(async move { for i in 0..10 { tx.send(i).await.unwrap(); } }); while let Some(value) = rx.recv().await { println!("Received: {}", value); } } ``` ### oneshot - 一次性通道 ```rust use tokio::sync::oneshot; #[tokio::main] async fn main() { let (tx, rx) = oneshot::channel::(); tokio::spawn(async move { tx.send("Hello".to_string()).unwrap(); }); let result = rx.await.unwrap(); println!("Received: {}", result); } ``` ### broadcast - 广播通道 ```rust use tokio::sync::broadcast; #[tokio::main] async fn main() { let (tx, mut rx1) = broadcast::channel::(16); let mut rx2 = tx.subscribe(); tx.send(1).unwrap(); tx.send(2).unwrap(); println!("rx1: {}", rx1.recv().await.unwrap()); println!("rx2: {}", rx2.recv().await.unwrap()); } ``` ### watch - 监视通道 ```rust use tokio::sync::watch; #[tokio::main] async fn main() { let (tx, mut rx) = watch::channel("initial"); tokio::spawn(async move { tokio::time::sleep(Duration::from_millis(100)).await; tx.send("updated").unwrap(); }); // 等待值变化 rx.changed().await.unwrap(); println!("New value: {}", *rx.borrow()); } ``` ### 通道选择指南 | 通道类型 | 生产者 | 消费者 | 用例 | |---------|--------|--------|------| | mpsc | 多个 | 1个 | 任务队列、消息传递 | | oneshot | 1个 | 1个 | 请求-响应、结果返回 | | broadcast | 多个 | 多个 | 事件广播、发布订阅 | | watch | 1个 | 多个 | 配置更新、状态共享 | --- # 第五部分:网络模块 ## 5.1 TcpListener 和 TcpStream ### TCP 服务器 ```rust use tokio::net::TcpListener; use tokio::io::{AsyncReadExt, AsyncWriteExt}; #[tokio::main] async fn main() -> Result<(), Box> { let listener = TcpListener::bind("127.0.0.1:8080").await?; loop { let (mut socket, addr) = listener.accept().await?; println!("Connection from: {}", addr); tokio::spawn(async move { let mut buf = [0; 1024]; loop { let n = match socket.read(&mut buf).await { Ok(0) => return, Ok(n) => n, Err(_) => return, }; if socket.write_all(&buf[..n]).await.is_err() { return; } } }); } } ``` ### TCP 客户端 ```rust use tokio::net::TcpStream; use tokio::io::{AsyncReadExt, AsyncWriteExt}; #[tokio::main] async fn main() -> Result<(), Box> { let mut stream = TcpStream::connect("127.0.0.1:8080").await?; stream.write_all(b"Hello, server!").await?; let mut buf = [0; 1024]; let n = stream.read(&mut buf).await?; println!("Response: {}", String::from_utf8_lossy(&buf[..n])); Ok(()) } ``` --- ## 5.2 UdpSocket ```rust use tokio::net::UdpSocket; #[tokio::main] async fn main() -> Result<(), Box> { // 服务器 let server = UdpSocket::bind("127.0.0.1:8080").await?; // 客户端 let client = UdpSocket::bind("127.0.0.1:0").await?; client.connect("127.0.0.1:8080").await?; // 发送 client.send(b"Hello UDP").await?; // 接收 let mut buf = [0; 1024]; let (n, addr) = server.recv_from(&mut buf).await?; println!("Received {} bytes from {}", n, addr); Ok(()) } ``` --- ## 5.3 TCP 配置选项 ```rust use tokio::net::TcpSocket; async fn configured_listener() -> std::io::Result { let socket = TcpSocket::new_v4()?; // 配置选项 socket.set_reuseaddr(true)?; socket.set_reuseport(true)?; // Linux only socket.set_nodelay(true)?; // 禁用 Nagle socket.set_keepalive(true)?; // TCP 保活 // 缓冲区大小 socket.set_send_buffer_size(64 * 1024)?; socket.set_recv_buffer_size(64 * 1024)?; // 绑定并监听 socket.bind("127.0.0.1:8080".parse()?)?; socket.listen(1024) } ``` --- ## 5.4 优雅关闭 ```rust use tokio::sync::broadcast; use tokio::signal; use tokio::net::TcpListener; #[tokio::main] async fn main() -> Result<(), Box> { let listener = TcpListener::bind("127.0.0.1:8080").await?; let (shutdown_tx, _) = broadcast::channel::<()>(1); loop { let mut shutdown_rx = shutdown_tx.subscribe(); tokio::select! { result = listener.accept() => { let (socket, _) = result?; let mut shutdown_rx = shutdown_tx.subscribe(); tokio::spawn(async move { tokio::select! { _ = handle_connection(socket) => {} _ = shutdown_rx.recv() => { println!("Graceful shutdown"); } } }); } _ = signal::ctrl_c() => { println!("Shutdown signal received"); let _ = shutdown_tx.send(()); break; } } } Ok(()) } ``` --- # 第六部分:定时器模块 ## 6.1 sleep ```rust use tokio::time::{sleep, Duration}; #[tokio::main] async fn main() { println!("Starting"); sleep(Duration::from_secs(1)).await; println!("1 second later"); } ``` --- ## 6.2 timeout 为 Future 添加超时限制: ```rust use tokio::time::{timeout, Duration}; #[tokio::main] async fn main() { let result = timeout(Duration::from_secs(1), async_operation()).await; match result { Ok(value) => println!("Completed: {:?}", value), Err(_) => println!("Timed out"), } } async fn async_operation() -> String { tokio::time::sleep(Duration::from_secs(2)).await; "Done".to_string() } ``` --- ## 6.3 interval 周期定时器: ```rust use tokio::time::{interval, Duration}; #[tokio::main] async fn main() { let mut interval = interval(Duration::from_millis(100)); for i in 0..5 { interval.tick().await; println!("Tick {}", i); } } ``` ### MissedTickBehavior 处理错过 tick 的策略: ```rust use tokio::time::{interval, Duration, MissedTickBehavior}; let mut interval = interval(Duration::from_millis(50)); // Burst(默认):快速追赶所有错过的 tick interval.set_missed_tick_behavior(MissedTickBehavior::Burst); // Skip:跳过错过的 tick interval.set_missed_tick_behavior(MissedTickBehavior::Skip); // Delay:从当前时间重新计算 interval.set_missed_tick_behavior(MissedTickBehavior::Delay); ``` --- ## 6.4 与 select! 配合 ```rust use tokio::time::{sleep, interval, Duration, Instant}; async fn timeout_loop() { let timeout_duration = Duration::from_secs(5); let sleep = sleep(timeout_duration); tokio::pin!(sleep); let mut heartbeat = interval(Duration::from_secs(1)); loop { tokio::select! { _ = &mut sleep => { println!("Timeout!"); break; } _ = heartbeat.tick() => { println!("Heartbeat"); // 重置超时 sleep.as_mut().reset(Instant::now() + timeout_duration); } } } } ``` --- ## 6.5 测试用时间控制 ```rust #[cfg(test)] mod tests { use tokio::time::{self, Duration, Instant}; #[tokio::test(start_paused = true)] async fn test_with_paused_time() { let start = Instant::now(); // 即使 sleep 10 秒,测试也会立即完成 tokio::time::sleep(Duration::from_secs(10)).await; assert!(start.elapsed() >= Duration::from_secs(10)); } #[tokio::test(flavor = "current_thread")] async fn test_manual_advance() { time::pause(); let start = Instant::now(); time::advance(Duration::from_secs(5)).await; assert_eq!(start.elapsed(), Duration::from_secs(5)); } } ``` --- # 第七部分:最佳实践 ## 7.1 不要在异步代码中阻塞 ```rust // 错误:阻塞 worker 线程 async fn bad() { std::thread::sleep(Duration::from_secs(1)); // 不要这样做! } // 正确:使用异步 sleep async fn good() { tokio::time::sleep(Duration::from_secs(1)).await; } // 正确:阻塞操作放入 spawn_blocking async fn also_good() { tokio::task::spawn_blocking(|| { std::thread::sleep(Duration::from_secs(1)); }).await.unwrap(); } ``` --- ## 7.2 共享状态的选择 ```rust use std::sync::Arc; use tokio::sync::Mutex; // 读多写少:使用 RwLock let data = Arc::new(tokio::sync::RwLock::new(HashMap::new())); // 短暂锁定、不跨 await:使用 std::sync::Mutex let counter = Arc::new(std::sync::Mutex::new(0)); // 需要跨 await 的长时间锁定:使用 tokio::sync::Mutex let connection = Arc::new(tokio::sync::Mutex::new(Connection::new())); // 只初始化一次:使用 OnceCell use tokio::sync::OnceCell; static CONFIG: OnceCell = OnceCell::const_new(); ``` --- ## 7.3 错误处理 ```rust use tokio::task::JoinHandle; async fn robust_spawn(future: F) -> T where F: std::future::Future + Send + 'static, T: Send + 'static, { let handle: JoinHandle = tokio::spawn(future); match handle.await { Ok(result) => result, Err(e) if e.is_panic() => { panic!("Task panicked: {:?}", e); } Err(e) => { panic!("Task failed: {:?}", e); } } } ``` --- ## 7.4 避免死锁 ```rust // 错误:嵌套锁可能死锁 async fn bad_nested_lock() { let mutex1 = Arc::new(Mutex::new(1)); let mutex2 = Arc::new(Mutex::new(2)); // Task 1: lock mutex1, then mutex2 // Task 2: lock mutex2, then mutex1 // 可能死锁! } // 正确:使用消息传递代替共享状态 async fn good_message_passing() { let (tx, mut rx) = tokio::sync::mpsc::channel(100); // Worker 持有状态 tokio::spawn(async move { let mut state = 0; while let Some(msg) = rx.recv().await { state += msg; } }); // 通过消息修改状态 tx.send(1).await.unwrap(); } ``` --- ## 7.5 生产级配置示例 ```rust use tokio::runtime::{Builder, Runtime}; pub fn create_production_runtime(name: &str, workers: usize) -> Runtime { Builder::new_multi_thread() .worker_threads(workers) .thread_name(format!("{}-worker", name)) .thread_stack_size(4 * 1024 * 1024) .max_blocking_threads(128) .enable_all() .on_thread_start(|| { log::debug!("Worker thread started"); }) .on_thread_stop(|| { log::debug!("Worker thread stopped"); }) .build() .expect("Failed to create runtime") } ``` --- # 第八部分:信号处理 ## 8.1 ctrl_c() 跨平台信号 ```rust use tokio::signal; #[tokio::main] async fn main() { println!("按 Ctrl+C 退出..."); signal::ctrl_c().await.expect("监听失败"); println!("收到退出信号,正在关闭..."); } ``` --- ## 8.2 Unix 信号处理 ```rust use tokio::signal::unix::{signal, SignalKind}; #[tokio::main] async fn main() -> std::io::Result<()> { let mut sigterm = signal(SignalKind::terminate())?; let mut sighup = signal(SignalKind::hangup())?; tokio::select! { _ = tokio::signal::ctrl_c() => { println!("收到 SIGINT"); } _ = sigterm.recv() => { println!("收到 SIGTERM"); } _ = sighup.recv() => { println!("收到 SIGHUP,重新加载配置"); } } Ok(()) } ``` ### 常用信号类型 | SignalKind | 说明 | |------------|------| | `interrupt()` | SIGINT (Ctrl+C) | | `terminate()` | SIGTERM | | `hangup()` | SIGHUP(配置重载) | | `quit()` | SIGQUIT | | `user_defined1()` | SIGUSR1 | | `user_defined2()` | SIGUSR2 | --- ## 8.3 优雅关闭模式 ```rust use tokio::signal; use tokio::sync::broadcast; #[tokio::main] async fn main() { let (shutdown_tx, _) = broadcast::channel::<()>(1); // 启动工作任务 let mut rx = shutdown_tx.subscribe(); tokio::spawn(async move { loop { tokio::select! { _ = rx.recv() => { println!("Worker 收到关闭信号"); break; } _ = do_work() => {} } } }); // 等待 Ctrl+C signal::ctrl_c().await.unwrap(); println!("开始优雅关闭..."); // 通知所有任务关闭 let _ = shutdown_tx.send(()); // 等待清理完成 tokio::time::sleep(std::time::Duration::from_secs(1)).await; } async fn do_work() { tokio::time::sleep(std::time::Duration::from_millis(100)).await; } ``` --- # 第九部分:子进程管理 ## 9.1 Command 基础 ```rust use tokio::process::Command; #[tokio::main] async fn main() -> std::io::Result<()> { // 简单执行命令 let output = Command::new("echo") .arg("Hello") .arg("World") .output() .await?; println!("stdout: {}", String::from_utf8_lossy(&output.stdout)); println!("状态: {:?}", output.status); Ok(()) } ``` --- ## 9.2 捕获子进程输出 ```rust use tokio::process::Command; use tokio::io::{AsyncBufReadExt, BufReader}; use std::process::Stdio; #[tokio::main] async fn main() -> std::io::Result<()> { let mut child = Command::new("ping") .args(["-c", "3", "localhost"]) .stdout(Stdio::piped()) .spawn()?; let stdout = child.stdout.take().unwrap(); let mut reader = BufReader::new(stdout).lines(); while let Some(line) = reader.next_line().await? { println!("输出: {}", line); } let status = child.wait().await?; println!("退出状态: {:?}", status); Ok(()) } ``` --- ## 9.3 写入子进程 stdin ```rust use tokio::process::Command; use tokio::io::AsyncWriteExt; use std::process::Stdio; #[tokio::main] async fn main() -> std::io::Result<()> { let mut child = Command::new("cat") .stdin(Stdio::piped()) .stdout(Stdio::piped()) .spawn()?; let mut stdin = child.stdin.take().unwrap(); // 写入数据 stdin.write_all(b"Hello from stdin\n").await?; drop(stdin); // 重要:关闭 stdin 发送 EOF let output = child.wait_with_output().await?; println!("输出: {}", String::from_utf8_lossy(&output.stdout)); Ok(()) } ``` --- ## 9.4 超时和终止 ```rust use tokio::process::Command; use tokio::time::{timeout, Duration}; #[tokio::main] async fn main() -> std::io::Result<()> { let mut child = Command::new("sleep") .arg("100") .spawn()?; // 带超时等待 match timeout(Duration::from_secs(2), child.wait()).await { Ok(status) => println!("正常退出: {:?}", status), Err(_) => { println!("超时,终止进程"); child.kill().await?; } } Ok(()) } ``` --- # 第十部分:LocalSet 本地任务集 ## 10.1 支持 !Send 类型 `LocalSet` 允许运行不实现 `Send` 的 Future(如使用 `Rc`)。 ```rust use tokio::task::{LocalSet, spawn_local}; use std::rc::Rc; #[tokio::main(flavor = "current_thread")] async fn main() { let local = LocalSet::new(); local.run_until(async { let data = Rc::new(vec![1, 2, 3]); // Rc 不是 Send spawn_local(async move { println!("Data: {:?}", data); }).await.unwrap(); }).await; } ``` --- ## 10.2 spawn_local vs spawn ```rust use tokio::task::{LocalSet, spawn_local}; use std::rc::Rc; #[tokio::main(flavor = "current_thread")] async fn main() { let local = LocalSet::new(); local.run_until(async { // spawn_local: 用于 !Send 任务,只在当前线程执行 let rc = Rc::new("local"); spawn_local(async move { println!("{}", rc); }); // tokio::spawn: 用于 Send 任务,可能在其他线程执行 let arc = std::sync::Arc::new("shared"); tokio::spawn(async move { println!("{}", arc); }); }).await; } ``` --- ## 10.3 适用场景 - GUI 应用(主线程 UI 操作) - WebAssembly 环境 - 使用 `Rc`/`RefCell` 共享状态 - 与 C 库 FFI(要求单线程) --- # 第十一部分:Unix 域套接字 ## 11.1 UnixListener 和 UnixStream ```rust use tokio::net::{UnixListener, UnixStream}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; // 服务端 async fn server() -> std::io::Result<()> { let _ = std::fs::remove_file("/tmp/app.sock"); let listener = UnixListener::bind("/tmp/app.sock")?; loop { let (mut stream, _) = listener.accept().await?; tokio::spawn(async move { let mut buf = [0u8; 1024]; let n = stream.read(&mut buf).await.unwrap(); stream.write_all(&buf[..n]).await.unwrap(); }); } } // 客户端 async fn client() -> std::io::Result<()> { let mut stream = UnixStream::connect("/tmp/app.sock").await?; stream.write_all(b"Hello Unix Socket").await?; let mut buf = [0u8; 1024]; let n = stream.read(&mut buf).await?; println!("收到: {}", String::from_utf8_lossy(&buf[..n])); Ok(()) } ``` --- ## 11.2 Unix 套接字 vs TCP | 特性 | Unix 域套接字 | TCP localhost | |------|--------------|---------------| | 性能 | 更快(2-3倍) | 较慢 | | 协议开销 | 无 | TCP/IP 头部 | | 跨机器 | 不支持 | 支持 | | 安全性 | 文件权限控制 | 需额外认证 | --- # 第十二部分:Runtime Metrics ## 12.1 获取运行时指标 ```rust use tokio::runtime::Handle; #[tokio::main] async fn main() { let handle = Handle::current(); let metrics = handle.metrics(); println!("Worker 数量: {}", metrics.num_workers()); println!("存活任务数: {}", metrics.num_alive_tasks()); println!("阻塞线程数: {}", metrics.num_blocking_threads()); println!("全局队列深度: {}", metrics.global_queue_depth()); // Per-worker 指标 for i in 0..metrics.num_workers() { println!("Worker {}: 窃取 {} 个任务", i, metrics.worker_steal_count(i)); } } ``` --- ## 12.2 可用指标 | 指标 | 说明 | |------|------| | `num_workers()` | worker 线程数 | | `num_alive_tasks()` | 存活任务数 | | `num_blocking_threads()` | 阻塞线程数 | | `global_queue_depth()` | 全局队列深度 | | `worker_steal_count(i)` | worker i 窃取的任务数 | | `worker_poll_count(i)` | worker i 轮询的任务数 | | `worker_local_queue_depth(i)` | worker i 本地队列深度 | --- # 第十三部分:tokio-util 编解码器 ## 13.1 Framed 和 Codec ```rust use tokio::net::TcpStream; use tokio_util::codec::{Framed, LinesCodec}; use futures::{SinkExt, StreamExt}; async fn line_protocol() -> std::io::Result<()> { let stream = TcpStream::connect("127.0.0.1:8080").await?; let mut framed = Framed::new(stream, LinesCodec::new()); // 发送一行 framed.send("Hello".to_string()).await?; // 接收一行 if let Some(Ok(line)) = framed.next().await { println!("收到: {}", line); } Ok(()) } ``` --- ## 13.2 LengthDelimitedCodec ```rust use tokio_util::codec::{LengthDelimitedCodec, Framed}; // 创建长度前缀编解码器 let codec = LengthDelimitedCodec::builder() .length_field_offset(0) // 长度字段偏移 .length_field_length(4) // 长度字段 4 字节 .length_adjustment(0) // 长度调整 .num_skip(4) // 跳过长度字段 .big_endian() // 大端序 .max_frame_length(1024 * 1024) // 最大帧大小 .new_codec(); ``` --- # 第十四部分:tokio-stream ## 14.1 StreamExt 方法 ```rust use tokio_stream::{StreamExt, wrappers::IntervalStream}; use tokio::time::{interval, Duration}; #[tokio::main] async fn main() { let stream = IntervalStream::new(interval(Duration::from_millis(100))); // take: 只取前 5 个 // map: 转换每个元素 // filter: 过滤元素 let mut stream = stream .take(10) .enumerate() .filter(|(i, _)| *i % 2 == 0) .map(|(i, _)| format!("Tick {}", i)); while let Some(msg) = stream.next().await { println!("{}", msg); } } ``` --- ## 14.2 BroadcastStream 和 WatchStream ```rust use tokio::sync::{broadcast, watch}; use tokio_stream::wrappers::{BroadcastStream, WatchStream}; use tokio_stream::StreamExt; #[tokio::main] async fn main() { // BroadcastStream: 广播通道转 Stream let (tx, rx) = broadcast::channel::(16); let mut stream = BroadcastStream::new(rx); tx.send(1).unwrap(); while let Some(Ok(value)) = stream.next().await { println!("广播: {}", value); break; } // WatchStream: 监视通道转 Stream(配置热更新) let (tx, rx) = watch::channel("initial"); let mut stream = WatchStream::new(rx); tx.send("updated").unwrap(); while let Some(value) = stream.next().await { println!("配置: {}", value); break; } } ``` --- # 附录:API 速查表 ## Runtime | API | 说明 | |-----|------| | `Runtime::new()` | 创建多线程运行时 | | `Builder::new_current_thread()` | 单线程运行时构建器 | | `Builder::new_multi_thread()` | 多线程运行时构建器 | | `rt.block_on(future)` | 阻塞执行 Future | | `rt.spawn(future)` | 在运行时中 spawn 任务 | | `Handle::current()` | 获取当前运行时句柄 | ## Task | API | 说明 | |-----|------| | `tokio::spawn(future)` | 创建异步任务 | | `task::spawn_blocking(fn)` | 在阻塞线程池执行 | | `task::yield_now()` | 主动让出执行权 | | `JoinHandle::abort()` | 取消任务 | | `JoinSet::new()` | 创建任务集合 | ## I/O | API | 说明 | |-----|------| | `AsyncReadExt::read()` | 异步读取 | | `AsyncWriteExt::write_all()` | 异步写入全部 | | `BufReader::new()` | 创建缓冲读取器 | | `io::copy()` | 复制数据 | | `io::copy_bidirectional()` | 双向复制 | ## Network | API | 说明 | |-----|------| | `TcpListener::bind()` | 绑定 TCP 监听 | | `TcpStream::connect()` | TCP 连接 | | `UdpSocket::bind()` | 绑定 UDP | | `listener.accept()` | 接受连接 | ## Time | API | 说明 | |-----|------| | `sleep(duration)` | 异步延迟 | | `timeout(duration, future)` | 超时包装 | | `interval(duration)` | 周期定时器 | | `Instant::now()` | 当前时间点 | ## Sync | API | 说明 | |-----|------| | `Mutex::new()` | 创建异步互斥锁 | | `RwLock::new()` | 创建读写锁 | | `Semaphore::new(n)` | 创建信号量 | | `Notify::new()` | 创建通知 | | `mpsc::channel(n)` | 创建有界通道 | | `oneshot::channel()` | 创建一次性通道 | | `broadcast::channel(n)` | 创建广播通道 | | `watch::channel(init)` | 创建监视通道 | --- # 第十五部分:Tracing 与调试 ## 15.1 tracing 集成 Tokio 与 `tracing` 深度集成,提供结构化诊断日志。 ### 基础配置 ```toml [dependencies] tokio = { version = "1", features = ["full", "tracing"] } tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } ``` ```rust use tracing::{info, debug, error, instrument, Level}; use tracing_subscriber::{fmt, EnvFilter}; #[tokio::main] async fn main() { // 初始化订阅器 tracing_subscriber::fmt() .with_env_filter(EnvFilter::from_default_env()) .with_target(true) .with_thread_ids(true) .init(); info!("Application started"); let result = process_request("user-123").await; info!(?result, "Request completed"); } #[instrument(skip(user_id), fields(user = %user_id))] async fn process_request(user_id: &str) -> Result { debug!("Processing request"); // 异步操作会自动关联到同一个 span let data = fetch_data().await?; Ok(data) } ``` ### 环境变量控制日志级别 ```bash # 全局 debug 级别 RUST_LOG=debug cargo run # 特定模块级别 RUST_LOG=my_app=debug,tokio=warn cargo run # 细粒度控制 RUST_LOG="my_app::db=trace,my_app::api=info" cargo run ``` ## 15.2 tokio-console 实时调试 `tokio-console` 是异步调试神器,提供任务状态可视化。 ### 配置 ```toml [dependencies] tokio = { version = "1", features = ["full", "tracing"] } console-subscriber = "0.4" ``` ```rust #[tokio::main] async fn main() { // 替代标准 tracing 初始化 console_subscriber::init(); // 应用代码... tokio::spawn(async { loop { tokio::time::sleep(Duration::from_secs(1)).await; } }); } ``` ### 运行调试控制台 ```bash # 安装 tokio-console CLI cargo install tokio-console # 运行应用(需要 RUSTFLAGS 启用 tokio_unstable) RUSTFLAGS="--cfg tokio_unstable" cargo run # 另一个终端运行控制台 tokio-console ``` ### 调试功能 | 功能 | 说明 | |------|------| | Tasks 视图 | 所有任务状态、耗时、轮询次数 | | Resources 视图 | 互斥锁、信号量等待状态 | | 任务详情 | 单个任务的完整生命周期 | | 警告检测 | 检测长时间运行、忙等待等问题 | ## 15.3 调试常见问题 ### 检测阻塞操作 ```rust use std::time::Duration; use tracing::warn; // 检测超过阈值的同步操作 #[instrument] async fn potentially_blocking() { let start = std::time::Instant::now(); // 可能的阻塞操作 let result = tokio::task::spawn_blocking(|| { expensive_sync_computation() }).await; let elapsed = start.elapsed(); if elapsed > Duration::from_millis(100) { warn!(?elapsed, "Operation took longer than expected"); } } ``` ### 任务泄漏检测 ```rust use tokio::task::JoinSet; use tracing::info; async fn managed_tasks() { let mut set = JoinSet::new(); for i in 0..10 { set.spawn(async move { info!(task_id = i, "Task started"); // 工作... info!(task_id = i, "Task completed"); }); } // 等待所有任务完成 while let Some(result) = set.join_next().await { if let Err(e) = result { error!(?e, "Task panicked"); } } info!("All tasks completed, no leaks"); } ``` --- # 第十六部分:异步测试 ## 16.1 #[tokio::test] 宏 ```rust #[tokio::test] async fn test_async_function() { let result = async_operation().await; assert_eq!(result, expected_value); } // 多线程测试运行时 #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_parallel() { let (tx, rx) = tokio::sync::oneshot::channel(); tokio::spawn(async move { tx.send(42).unwrap(); }); assert_eq!(rx.await.unwrap(), 42); } // 单线程测试运行时 #[tokio::test(flavor = "current_thread")] async fn test_single_thread() { // 使用 LocalSet 兼容的测试 } ``` ## 16.2 时间控制 (pause/resume) Tokio 提供测试时的时间控制,避免实际等待。 ```rust use tokio::time::{self, Duration, Instant}; #[tokio::test] async fn test_timeout_without_waiting() { // 暂停时间,立即推进 time::pause(); let start = Instant::now(); // 这不会真正等待 1 小时 time::sleep(Duration::from_secs(3600)).await; // 逻辑时间已过 1 小时,但实际耗时接近 0 assert!(start.elapsed() >= Duration::from_secs(3600)); } #[tokio::test] async fn test_interval() { time::pause(); let mut interval = time::interval(Duration::from_secs(10)); let mut count = 0; for _ in 0..5 { interval.tick().await; count += 1; } assert_eq!(count, 5); // 实际耗时接近 0,逻辑时间过去 50 秒 } ``` ### 时间推进控制 ```rust #[tokio::test] async fn test_advance_time() { time::pause(); let handle = tokio::spawn(async { time::sleep(Duration::from_secs(100)).await; "completed" }); // 手动推进时间 50 秒 time::advance(Duration::from_secs(50)).await; // 任务还没完成 assert!(!handle.is_finished()); // 再推进 50 秒 time::advance(Duration::from_secs(50)).await; // 现在完成了 assert_eq!(handle.await.unwrap(), "completed"); } ``` ## 16.3 测试异步代码模式 ### Mock 异步依赖 ```rust use async_trait::async_trait; #[async_trait] trait DataStore { async fn get(&self, key: &str) -> Option; async fn set(&self, key: &str, value: String); } // 生产实现 struct RedisStore { /* ... */ } #[async_trait] impl DataStore for RedisStore { async fn get(&self, key: &str) -> Option { // 实际 Redis 调用 todo!() } async fn set(&self, key: &str, value: String) { todo!() } } // 测试 Mock struct MockStore { data: std::sync::Mutex>, } #[async_trait] impl DataStore for MockStore { async fn get(&self, key: &str) -> Option { self.data.lock().unwrap().get(key).cloned() } async fn set(&self, key: &str, value: String) { self.data.lock().unwrap().insert(key.to_string(), value); } } #[tokio::test] async fn test_with_mock() { let store = MockStore { data: std::sync::Mutex::new(std::collections::HashMap::new()), }; store.set("key", "value".to_string()).await; assert_eq!(store.get("key").await, Some("value".to_string())); } ``` ### 测试超时 ```rust use tokio::time::{timeout, Duration}; #[tokio::test] async fn test_with_timeout() { let result = timeout( Duration::from_secs(5), potentially_slow_operation() ).await; assert!(result.is_ok(), "Operation timed out"); } #[tokio::test] async fn test_expected_timeout() { let result = timeout( Duration::from_millis(100), async { tokio::time::sleep(Duration::from_secs(10)).await; } ).await; assert!(result.is_err(), "Expected timeout"); } ``` ### 并发测试 ```rust #[tokio::test(flavor = "multi_thread")] async fn test_concurrent_access() { use std::sync::Arc; use tokio::sync::Mutex; let counter = Arc::new(Mutex::new(0)); let mut handles = vec![]; for _ in 0..100 { let counter = Arc::clone(&counter); handles.push(tokio::spawn(async move { let mut lock = counter.lock().await; *lock += 1; })); } for handle in handles { handle.await.unwrap(); } assert_eq!(*counter.lock().await, 100); } ``` --- # 第十七部分:Tower 生态系统 ## 17.1 Tower 简介 Tower 是构建在 Tokio 之上的服务抽象层,提供可组合的中间件。 ```toml [dependencies] tower = { version = "0.5", features = ["full"] } tower-http = { version = "0.6", features = ["full"] } ``` ### Service trait ```rust use tower::Service; use std::future::Future; use std::pin::Pin; // Tower 核心抽象 pub trait Service { type Response; type Error; type Future: Future>; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll>; fn call(&mut self, req: Request) -> Self::Future; } ``` ## 17.2 常用中间件 ### 超时层 ```rust use tower::ServiceBuilder; use tower::timeout::TimeoutLayer; use std::time::Duration; let service = ServiceBuilder::new() .layer(TimeoutLayer::new(Duration::from_secs(10))) .service(my_service); ``` ### 重试层 ```rust use tower::retry::{Retry, Policy}; #[derive(Clone)] struct RetryPolicy { max_retries: usize, } impl Policy for RetryPolicy { type Future = futures::future::Ready<()>; fn retry(&mut self, req: &mut Request, result: &mut Result) -> Option { if self.max_retries > 0 && result.is_err() { self.max_retries -= 1; Some(futures::future::ready(())) } else { None } } fn clone_request(&mut self, req: &Request) -> Option { Some(req.clone()) } } let service = ServiceBuilder::new() .layer(tower::retry::RetryLayer::new(RetryPolicy { max_retries: 3 })) .service(my_service); ``` ### 并发限制 ```rust use tower::limit::ConcurrencyLimitLayer; let service = ServiceBuilder::new() .layer(ConcurrencyLimitLayer::new(100)) // 最多 100 并发请求 .service(my_service); ``` ### 速率限制 ```rust use tower::limit::RateLimitLayer; use std::time::Duration; let service = ServiceBuilder::new() .layer(RateLimitLayer::new(100, Duration::from_secs(1))) // 100 req/s .service(my_service); ``` ## 17.3 tower-http(HTTP 专用中间件) ```rust use tower_http::{ compression::CompressionLayer, cors::CorsLayer, trace::TraceLayer, timeout::TimeoutLayer, }; use tower::ServiceBuilder; let middleware_stack = ServiceBuilder::new() .layer(TraceLayer::new_for_http()) .layer(CompressionLayer::new()) .layer(CorsLayer::permissive()) .layer(TimeoutLayer::new(Duration::from_secs(30))); ``` ## 17.4 与 Axum 集成 ```rust use axum::{Router, routing::get}; use tower::ServiceBuilder; use tower_http::trace::TraceLayer; use std::time::Duration; #[tokio::main] async fn main() { let app = Router::new() .route("/", get(handler)) .layer( ServiceBuilder::new() .layer(TraceLayer::new_for_http()) .layer(tower::timeout::TimeoutLayer::new(Duration::from_secs(10))) .layer(tower::limit::ConcurrencyLimitLayer::new(100)) ); let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap(); axum::serve(listener, app).await.unwrap(); } async fn handler() -> &'static str { "Hello, World!" } ``` ## 17.5 Tokio 生态全景 ``` ┌─────────────────────────────────────────────────────────────┐ │ 应用层 │ │ (Axum, Actix-web, Warp, Tonic, Hyper) │ ├─────────────────────────────────────────────────────────────┤ │ 中间件层 │ │ (Tower, tower-http) │ ├─────────────────────────────────────────────────────────────┤ │ 扩展库 │ │ (tokio-stream, tokio-util, tokio-console) │ ├─────────────────────────────────────────────────────────────┤ │ Tokio 核心 │ │ (Runtime, I/O, Time, Sync, Net, Task) │ ├─────────────────────────────────────────────────────────────┤ │ Rust async/await │ │ (Future, Poll, Pin, Waker) │ └─────────────────────────────────────────────────────────────┘ ``` | 层次 | 代表项目 | 用途 | |------|----------|------| | Web 框架 | Axum, Actix-web | 构建 HTTP 服务 | | gRPC | Tonic | 构建 RPC 服务 | | 中间件 | Tower | 服务组合与增强 | | 诊断 | tokio-console, tracing | 调试与监控 | | 数据库 | sqlx, sea-orm | 异步数据库访问 | | HTTP 客户端 | reqwest, hyper | 发起 HTTP 请求 | --- **版本**: 1.1 | **更新**: 2026-01-19 | **文档状态**: 完整