# Tokio 网络模块深入分析 > 本文档为 LLM 参考文档,深入分析 Tokio 的网络编程能力,包括 TCP、UDP、Unix 域套接字、连接池模式、优雅关闭等核心主题。 ## 目录 1. [TcpListener 和 TcpStream](#1-tcplistener-和-tcpstream) 2. [UdpSocket UDP 编程](#2-udpsocket-udp-编程) 3. [UnixListener 和 UnixStream](#3-unixlistener-和-unixstream) 4. [连接池和连接复用模式](#4-连接池和连接复用模式) 5. [优雅关闭(Graceful Shutdown)](#5-优雅关闭graceful-shutdown) 6. [TCP 选项配置](#6-tcp-选项配置) 7. [地址解析 lookup_host](#7-地址解析-lookup_host) 8. [典型服务器模式](#8-典型服务器模式) --- ## 1. TcpListener 和 TcpStream ### 1.1 核心概念 `TcpListener` 和 `TcpStream` 是 Tokio 中 TCP 网络编程的基础组件: - **TcpListener**: TCP 套接字服务器,监听传入的连接 - **TcpStream**: 表示本地和远程套接字之间已建立的 TCP 连接 ### 1.2 TcpListener 基础用法 ```rust use tokio::net::TcpListener; use std::io; #[tokio::main] async fn main() -> io::Result<()> { // 绑定到指定地址和端口 let listener = TcpListener::bind("127.0.0.1:8080").await?; println!("服务器监听在: {}", listener.local_addr()?); loop { // accept() 返回 (TcpStream, SocketAddr) let (socket, addr) = listener.accept().await?; println!("新连接来自: {}", addr); // 处理连接(这里只是占位) process_socket(socket).await; } } async fn process_socket(_socket: T) { // 处理套接字的逻辑 } ``` ### 1.3 TcpListener 的内部原理 ``` TcpListener::bind(addr) | v +------------------+ | 1. 解析地址 | -> 将字符串地址转换为 SocketAddr +------------------+ | v +------------------+ | 2. 创建套接字 | -> 调用系统 socket() +------------------+ | v +------------------+ | 3. 设置非阻塞 | -> set_nonblocking(true) +------------------+ | v +------------------+ | 4. 绑定地址 | -> 调用系统 bind() +------------------+ | v +------------------+ | 5. 开始监听 | -> 调用系统 listen() +------------------+ | v +------------------+ | 6. 注册到 mio | -> 注册到事件循环 +------------------+ ``` ### 1.4 accept() 方法详解 `accept()` 方法是取消安全(cancel safe)的: ```rust use tokio::net::TcpListener; use tokio::time::{timeout, Duration}; async fn accept_with_timeout() -> std::io::Result<()> { let listener = TcpListener::bind("127.0.0.1:8080").await?; loop { // 可以安全地在 select! 或 timeout 中使用 match timeout(Duration::from_secs(5), listener.accept()).await { Ok(Ok((socket, addr))) => { println!("接受连接: {}", addr); // 处理连接 } Ok(Err(e)) => { eprintln!("接受连接失败: {}", e); } Err(_) => { println!("等待连接超时,继续等待..."); } } } } ``` ### 1.5 TcpStream 读写操作 ```rust use tokio::net::TcpStream; use tokio::io::{AsyncReadExt, AsyncWriteExt}; async fn handle_connection(mut stream: TcpStream) -> std::io::Result<()> { let mut buffer = [0u8; 1024]; loop { // 读取数据 let n = match stream.read(&mut buffer).await { Ok(0) => { // 连接已关闭 println!("连接关闭"); return Ok(()); } Ok(n) => n, Err(e) => { eprintln!("读取错误: {}", e); return Err(e); } }; println!("收到 {} 字节: {:?}", n, &buffer[..n]); // 写回数据(echo) stream.write_all(&buffer[..n]).await?; } } ``` ### 1.6 TcpStream 分割(Split) 将 TcpStream 分割为独立的读写半部分: ```rust use tokio::net::TcpStream; use tokio::io::{AsyncReadExt, AsyncWriteExt}; async fn split_stream_example() -> std::io::Result<()> { let stream = TcpStream::connect("127.0.0.1:8080").await?; // 方法1: split() - 借用方式分割 // 两个半部分共享同一个底层套接字的引用 let (mut reader, mut writer) = stream.split(); // 方法2: into_split() - 所有权方式分割 // 返回 OwnedReadHalf 和 OwnedWriteHalf,可以移动到不同任务 let stream = TcpStream::connect("127.0.0.1:8080").await?; let (mut owned_reader, mut owned_writer) = stream.into_split(); // 可以在不同任务中使用 let read_task = tokio::spawn(async move { let mut buf = [0u8; 1024]; loop { match owned_reader.read(&mut buf).await { Ok(0) => break, Ok(n) => println!("读取 {} 字节", n), Err(e) => { eprintln!("读取错误: {}", e); break; } } } }); let write_task = tokio::spawn(async move { owned_writer.write_all(b"Hello, World!").await.unwrap(); }); let _ = tokio::join!(read_task, write_task); Ok(()) } ``` ### 1.7 使用 BufReader 和 BufWriter ```rust use tokio::net::TcpStream; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter}; async fn buffered_io_example() -> std::io::Result<()> { let stream = TcpStream::connect("127.0.0.1:8080").await?; let (reader, writer) = stream.into_split(); // 使用缓冲读取器 let mut buf_reader = BufReader::new(reader); // 使用缓冲写入器 let mut buf_writer = BufWriter::new(writer); // 按行读取 let mut line = String::new(); buf_reader.read_line(&mut line).await?; println!("收到行: {}", line); // 写入并刷新缓冲区 buf_writer.write_all(b"Response\n").await?; buf_writer.flush().await?; Ok(()) } ``` ### 1.8 TcpStream 连接超时 ```rust use tokio::net::TcpStream; use tokio::time::{timeout, Duration}; use std::io; async fn connect_with_timeout(addr: &str) -> io::Result { match timeout(Duration::from_secs(5), TcpStream::connect(addr)).await { Ok(Ok(stream)) => Ok(stream), Ok(Err(e)) => Err(e), Err(_) => Err(io::Error::new( io::ErrorKind::TimedOut, "连接超时", )), } } ``` --- ## 2. UdpSocket UDP 编程 ### 2.1 UDP 核心概念 UDP(用户数据报协议)是"无连接"的,与 TCP 不同。在 Tokio 中,`UdpSocket` 可以与多个不同的远程地址通信。 ### 2.2 两种使用模式 #### 模式一:一对多(One-to-Many) 使用 `bind` + `send_to` + `recv_from`: ```rust use tokio::net::UdpSocket; use std::io; #[tokio::main] async fn main() -> io::Result<()> { // 绑定到本地地址 let sock = UdpSocket::bind("0.0.0.0:8080").await?; let mut buf = [0u8; 1024]; loop { // 从任意地址接收数据 let (len, addr) = sock.recv_from(&mut buf).await?; println!("收到来自 {} 的 {} 字节", addr, len); // 发送响应到发送方 let response = b"收到你的消息!"; sock.send_to(response, addr).await?; } } ``` #### 模式二:一对一(One-to-One) 使用 `connect` + `send` + `recv`: ```rust use tokio::net::UdpSocket; use std::io; #[tokio::main] async fn main() -> io::Result<()> { // 绑定到本地地址 let sock = UdpSocket::bind("0.0.0.0:0").await?; // 连接到特定远程地址 sock.connect("127.0.0.1:8081").await?; // 发送数据(只能发送到已连接的地址) sock.send(b"Hello!").await?; // 接收数据(只接收来自已连接地址的数据) let mut buf = [0u8; 1024]; let len = sock.recv(&mut buf).await?; println!("收到 {} 字节: {:?}", len, &buf[..len]); Ok(()) } ``` ### 2.3 UDP Echo 服务器完整示例 ```rust use tokio::net::UdpSocket; use std::io; use std::sync::Arc; #[tokio::main] async fn main() -> io::Result<()> { let sock = UdpSocket::bind("0.0.0.0:8080").await?; let sock = Arc::new(sock); println!("UDP Echo 服务器监听在 0.0.0.0:8080"); let mut buf = [0u8; 65535]; loop { let (len, addr) = sock.recv_from(&mut buf).await?; let sock_clone = sock.clone(); let data = buf[..len].to_vec(); // 为每个数据报生成处理任务 tokio::spawn(async move { println!("收到来自 {} 的 {} 字节", addr, len); // Echo 回去 if let Err(e) = sock_clone.send_to(&data, addr).await { eprintln!("发送失败: {}", e); } }); } } ``` ### 2.4 UDP 广播 ```rust use tokio::net::UdpSocket; use std::io; async fn broadcast_example() -> io::Result<()> { let sock = UdpSocket::bind("0.0.0.0:0").await?; // 启用广播 sock.set_broadcast(true)?; // 发送广播消息 let broadcast_addr = "255.255.255.255:8080"; sock.send_to(b"Hello everyone!", broadcast_addr).await?; println!("广播消息已发送"); Ok(()) } ``` ### 2.5 UDP 组播(Multicast) ```rust use tokio::net::UdpSocket; use std::net::Ipv4Addr; use std::io; async fn multicast_example() -> io::Result<()> { let sock = UdpSocket::bind("0.0.0.0:8080").await?; // 加入组播组 let multicast_addr = Ipv4Addr::new(239, 0, 0, 1); let interface = Ipv4Addr::new(0, 0, 0, 0); // 注意:需要使用底层套接字操作 let std_sock = sock.into_std()?; std_sock.join_multicast_v4(&multicast_addr, &interface)?; let sock = UdpSocket::from_std(std_sock)?; let mut buf = [0u8; 1024]; loop { let (len, addr) = sock.recv_from(&mut buf).await?; println!("组播消息来自 {}: {:?}", addr, &buf[..len]); } } ``` ### 2.6 poll_recv_from 和 try_recv_from 用于更精细的控制: ```rust use tokio::net::UdpSocket; use std::io; async fn poll_based_recv() -> io::Result<()> { let sock = UdpSocket::bind("0.0.0.0:8080").await?; let mut buf = [0u8; 1024]; loop { // 等待套接字可读 sock.readable().await?; // 尝试非阻塞接收 match sock.try_recv_from(&mut buf) { Ok((len, addr)) => { println!("收到来自 {} 的 {} 字节", addr, len); } Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { // 实际上没有数据可读,继续等待 continue; } Err(e) => { return Err(e); } } } } ``` --- ## 3. UnixListener 和 UnixStream ### 3.1 概述 Unix 域套接字提供进程间通信(IPC),仅在 Unix 平台可用。相比 TCP/IP: - **更低延迟**: 无需网络栈处理 - **更高吞吐**: 内核直接传输数据 - **更好的安全性**: 可以使用文件系统权限 ### 3.2 UnixListener 基础用法 ```rust use tokio::net::UnixListener; use std::io; #[tokio::main] async fn main() -> io::Result<()> { // 删除可能存在的旧套接字文件 let socket_path = "/tmp/my_app.sock"; let _ = std::fs::remove_file(socket_path); // 绑定到 Unix 套接字路径 let listener = UnixListener::bind(socket_path)?; println!("Unix 域套接字服务器启动: {}", socket_path); loop { match listener.accept().await { Ok((stream, addr)) => { println!("新连接: {:?}", addr); tokio::spawn(async move { handle_client(stream).await; }); } Err(e) => { eprintln!("接受连接失败: {}", e); } } } } async fn handle_client(stream: tokio::net::UnixStream) { use tokio::io::{AsyncReadExt, AsyncWriteExt}; let (mut reader, mut writer) = stream.into_split(); let mut buf = [0u8; 1024]; loop { match reader.read(&mut buf).await { Ok(0) => break, Ok(n) => { if writer.write_all(&buf[..n]).await.is_err() { break; } } Err(_) => break, } } } ``` ### 3.3 UnixStream 客户端 ```rust use tokio::net::UnixStream; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use std::io; async fn unix_client() -> io::Result<()> { let socket_path = "/tmp/my_app.sock"; // 连接到 Unix 套接字 let mut stream = UnixStream::connect(socket_path).await?; // 发送数据 stream.write_all(b"Hello, server!").await?; // 接收响应 let mut buf = [0u8; 1024]; let n = stream.read(&mut buf).await?; println!("收到响应: {:?}", &buf[..n]); Ok(()) } ``` ### 3.4 使用 readable() 进行就绪轮询 ```rust use tokio::net::UnixStream; use std::error::Error; use std::io; async fn poll_based_read() -> Result<(), Box> { let stream = UnixStream::connect("/tmp/my_app.sock").await?; let mut msg = vec![0; 1024]; loop { // 等待套接字可读 stream.readable().await?; // 尝试读取数据,可能因为就绪事件是误报而失败 match stream.try_read(&mut msg) { Ok(n) => { msg.truncate(n); break; } Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { // 实际没有数据,继续等待 continue; } Err(e) => { return Err(e.into()); } } } println!("GOT = {:?}", msg); Ok(()) } ``` ### 3.5 UnixStream::pair() 创建匿名套接字对 ```rust use tokio::net::UnixStream; use tokio::io::{AsyncReadExt, AsyncWriteExt}; async fn socket_pair_example() -> std::io::Result<()> { // 创建一对连接的匿名 Unix 流套接字 let (mut stream1, mut stream2) = UnixStream::pair()?; // 在任务1中发送 let task1 = tokio::spawn(async move { stream1.write_all(b"Hello from task 1").await.unwrap(); }); // 在任务2中接收 let task2 = tokio::spawn(async move { let mut buf = [0u8; 1024]; let n = stream2.read(&mut buf).await.unwrap(); println!("Task 2 收到: {:?}", &buf[..n]); }); let _ = tokio::join!(task1, task2); Ok(()) } ``` ### 3.6 UnixDatagram(数据报类型) ```rust use tokio::net::UnixDatagram; use std::io; async fn unix_datagram_example() -> io::Result<()> { let socket_path = "/tmp/my_datagram.sock"; let _ = std::fs::remove_file(socket_path); // 绑定数据报套接字 let sock = UnixDatagram::bind(socket_path)?; let mut buf = [0u8; 1024]; // 接收数据报 let (len, addr) = sock.recv_from(&mut buf).await?; println!("收到来自 {:?} 的 {} 字节", addr, len); // 发送响应 if let Some(path) = addr.as_pathname() { sock.send_to(b"Response", path).await?; } Ok(()) } ``` ### 3.7 设置 Unix 套接字权限 ```rust use tokio::net::UnixListener; use std::os::unix::fs::PermissionsExt; use std::io; async fn create_socket_with_permissions() -> io::Result<()> { let socket_path = "/tmp/secure.sock"; let _ = std::fs::remove_file(socket_path); let listener = UnixListener::bind(socket_path)?; // 设置权限:仅所有者可读写 std::fs::set_permissions( socket_path, std::fs::Permissions::from_mode(0o600), )?; println!("安全套接字已创建: {}", socket_path); loop { let (stream, _) = listener.accept().await?; // 处理连接... } } ``` --- ## 4. 连接池和连接复用模式 ### 4.1 连接池的必要性 - **减少连接建立开销**: TCP 三次握手耗时 - **资源复用**: 避免频繁创建/销毁连接 - **限制并发连接数**: 保护服务器资源 ### 4.2 简单连接池实现 ```rust use tokio::net::TcpStream; use tokio::sync::{Mutex, Semaphore}; use std::collections::VecDeque; use std::sync::Arc; use std::io; pub struct ConnectionPool { connections: Mutex>, semaphore: Semaphore, address: String, max_size: usize, } impl ConnectionPool { pub fn new(address: String, max_size: usize) -> Arc { Arc::new(Self { connections: Mutex::new(VecDeque::with_capacity(max_size)), semaphore: Semaphore::new(max_size), address, max_size, }) } /// 获取连接 pub async fn get(&self) -> io::Result { // 获取信号量许可 let permit = self.semaphore.acquire().await.unwrap(); // 尝试从池中获取现有连接 let mut pool = self.connections.lock().await; if let Some(conn) = pool.pop_front() { drop(pool); return Ok(PooledConnection { conn: Some(conn), pool: self, }); } drop(pool); // 创建新连接 let conn = TcpStream::connect(&self.address).await?; Ok(PooledConnection { conn: Some(conn), pool: self, }) } /// 归还连接 async fn return_connection(&self, conn: TcpStream) { let mut pool = self.connections.lock().await; if pool.len() < self.max_size { pool.push_back(conn); } // 如果池已满,连接会被丢弃 } } /// 池化连接包装器 pub struct PooledConnection<'a> { conn: Option, pool: &'a ConnectionPool, } impl<'a> PooledConnection<'a> { pub fn stream(&mut self) -> &mut TcpStream { self.conn.as_mut().unwrap() } } impl<'a> Drop for PooledConnection<'a> { fn drop(&mut self) { if let Some(conn) = self.conn.take() { let pool = self.pool; // 在后台任务中归还连接 tokio::spawn(async move { pool.return_connection(conn).await; }); } } } // 使用示例 async fn use_pool() -> io::Result<()> { let pool = ConnectionPool::new("127.0.0.1:8080".to_string(), 10); // 获取连接 let mut conn = pool.get().await?; // 使用连接 use tokio::io::AsyncWriteExt; conn.stream().write_all(b"Hello").await?; // conn 在作用域结束时自动归还到池 Ok(()) } ``` ### 4.3 带健康检查的连接池 ```rust use tokio::net::TcpStream; use tokio::sync::Mutex; use tokio::time::{Duration, Instant}; use std::collections::VecDeque; use std::sync::Arc; use std::io; struct PooledTcpStream { stream: TcpStream, created_at: Instant, last_used: Instant, } pub struct HealthyConnectionPool { connections: Arc>>, address: String, max_idle_time: Duration, max_lifetime: Duration, } impl HealthyConnectionPool { pub fn new(address: String) -> Self { Self { connections: Arc::new(Mutex::new(VecDeque::new())), address, max_idle_time: Duration::from_secs(60), // 空闲超过60秒移除 max_lifetime: Duration::from_secs(300), // 最长生存5分钟 } } /// 启动后台清理任务 pub fn start_cleanup_task(&self) { let connections = self.connections.clone(); let max_idle = self.max_idle_time; let max_life = self.max_lifetime; tokio::spawn(async move { let mut interval = tokio::time::interval(Duration::from_secs(30)); loop { interval.tick().await; let mut pool = connections.lock().await; let now = Instant::now(); // 移除过期连接 pool.retain(|conn| { let idle_ok = now.duration_since(conn.last_used) < max_idle; let life_ok = now.duration_since(conn.created_at) < max_life; idle_ok && life_ok }); } }); } /// 获取健康的连接 pub async fn get(&self) -> io::Result { let mut pool = self.connections.lock().await; let now = Instant::now(); // 查找健康的连接 while let Some(mut conn) = pool.pop_front() { // 检查连接是否过期 if now.duration_since(conn.last_used) >= self.max_idle_time { continue; } if now.duration_since(conn.created_at) >= self.max_lifetime { continue; } // 可以在这里添加更多健康检查(如发送 ping) conn.last_used = now; return Ok(conn.stream); } drop(pool); // 创建新连接 TcpStream::connect(&self.address).await } /// 归还连接 pub async fn put(&self, stream: TcpStream) { let mut pool = self.connections.lock().await; pool.push_back(PooledTcpStream { stream, created_at: Instant::now(), last_used: Instant::now(), }); } } ``` ### 4.4 使用 bb8 或 deadpool 库 推荐使用成熟的连接池库: ```rust // Cargo.toml: // bb8 = "0.8" // bb8-tokio = "0.8" use bb8::{Pool, ManageConnection}; use tokio::net::TcpStream; use std::io; use async_trait::async_trait; struct TcpConnectionManager { address: String, } #[async_trait] impl ManageConnection for TcpConnectionManager { type Connection = TcpStream; type Error = io::Error; async fn connect(&self) -> Result { TcpStream::connect(&self.address).await } async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> { // 简单的健康检查:检查连接是否仍然有效 // 可以发送心跳包等 Ok(()) } fn has_broken(&self, _conn: &mut Self::Connection) -> bool { false } } async fn bb8_pool_example() -> Result<(), Box> { let manager = TcpConnectionManager { address: "127.0.0.1:8080".to_string(), }; let pool = Pool::builder() .max_size(15) .min_idle(Some(5)) .build(manager) .await?; // 获取连接 let conn = pool.get().await?; // 使用连接... // 连接在作用域结束时自动归还 Ok(()) } ``` --- ## 5. 优雅关闭(Graceful Shutdown) ### 5.1 优雅关闭的重要性 - **完成正在处理的请求**: 避免请求中断 - **释放资源**: 确保资源正确清理 - **数据一致性**: 确保数据正确持久化 ### 5.2 使用 Ctrl+C 信号 ```rust use tokio::signal; #[tokio::main] async fn main() { println!("服务器启动,按 Ctrl+C 关闭"); // 等待 Ctrl+C 信号 signal::ctrl_c().await.expect("监听信号失败"); println!("收到 Ctrl+C,开始优雅关闭..."); // 执行清理操作 cleanup().await; println!("关闭完成"); } async fn cleanup() { // 清理逻辑 } ``` ### 5.3 使用 tokio::select! 实现优雅关闭 ```rust use tokio::net::TcpListener; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::signal; use tokio::sync::broadcast; #[tokio::main] async fn main() -> std::io::Result<()> { let listener = TcpListener::bind("127.0.0.1:8080").await?; // 创建关闭信号广播通道 let (shutdown_tx, _) = broadcast::channel::<()>(1); println!("服务器启动在 127.0.0.1:8080"); loop { tokio::select! { // 接受新连接 result = listener.accept() => { let (socket, addr) = result?; println!("新连接: {}", addr); // 为每个连接创建关闭接收器 let mut shutdown_rx = shutdown_tx.subscribe(); tokio::spawn(async move { handle_connection_with_shutdown(socket, &mut shutdown_rx).await; }); } // 监听关闭信号 _ = signal::ctrl_c() => { println!("收到关闭信号"); // 发送关闭通知给所有连接处理器 let _ = shutdown_tx.send(()); // 等待一段时间让连接完成 tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; println!("服务器关闭"); break; } } } Ok(()) } async fn handle_connection_with_shutdown( mut socket: tokio::net::TcpStream, shutdown_rx: &mut broadcast::Receiver<()>, ) { let mut buf = [0u8; 1024]; loop { tokio::select! { // 处理数据 result = socket.read(&mut buf) => { match result { Ok(0) => { println!("客户端断开"); break; } Ok(n) => { if socket.write_all(&buf[..n]).await.is_err() { break; } } Err(_) => break, } } // 收到关闭信号 _ = shutdown_rx.recv() => { println!("连接处理器收到关闭信号,停止处理"); // 可以发送关闭通知给客户端 let _ = socket.write_all(b"Server shutting down\n").await; break; } } } } ``` ### 5.4 使用 watch 通道实现优雅关闭 ```rust use tokio::net::TcpListener; use tokio::sync::watch; use tokio::signal; #[tokio::main] async fn main() -> std::io::Result<()> { // 创建 watch 通道,初始值为 false(未关闭) let (shutdown_tx, shutdown_rx) = watch::channel(false); let listener = TcpListener::bind("127.0.0.1:8080").await?; // 启动关闭信号监听任务 let shutdown_tx_clone = shutdown_tx.clone(); tokio::spawn(async move { signal::ctrl_c().await.expect("监听信号失败"); println!("收到 Ctrl+C,发送关闭信号"); let _ = shutdown_tx_clone.send(true); }); println!("服务器启动"); loop { let mut shutdown_rx = shutdown_rx.clone(); tokio::select! { result = listener.accept() => { let (socket, addr) = result?; println!("新连接: {}", addr); let shutdown_rx = shutdown_rx.clone(); tokio::spawn(async move { handle_with_watch(socket, shutdown_rx).await; }); } // 检查是否收到关闭信号 _ = shutdown_rx.changed() => { if *shutdown_rx.borrow() { println!("停止接受新连接"); break; } } } } // 等待现有连接完成 println!("等待现有连接完成..."); tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; println!("服务器完全关闭"); Ok(()) } async fn handle_with_watch( mut socket: tokio::net::TcpStream, mut shutdown_rx: watch::Receiver, ) { use tokio::io::{AsyncReadExt, AsyncWriteExt}; let mut buf = [0u8; 1024]; loop { tokio::select! { result = socket.read(&mut buf) => { match result { Ok(0) | Err(_) => break, Ok(n) => { let _ = socket.write_all(&buf[..n]).await; } } } _ = shutdown_rx.changed() => { if *shutdown_rx.borrow() { println!("连接处理器:收到关闭信号"); break; } } } } } ``` ### 5.5 使用 CancellationToken (tokio-util) ```rust // Cargo.toml: tokio-util = { version = "0.7", features = ["rt"] } use tokio::net::TcpListener; use tokio_util::sync::CancellationToken; #[tokio::main] async fn main() -> std::io::Result<()> { let token = CancellationToken::new(); let listener = TcpListener::bind("127.0.0.1:8080").await?; // 信号处理任务 let token_clone = token.clone(); tokio::spawn(async move { tokio::signal::ctrl_c().await.unwrap(); token_clone.cancel(); }); loop { tokio::select! { result = listener.accept() => { let (socket, _) = result?; let token = token.child_token(); // 创建子 token tokio::spawn(async move { handle_with_cancellation(socket, token).await; }); } _ = token.cancelled() => { println!("主循环取消"); break; } } } Ok(()) } async fn handle_with_cancellation( mut socket: tokio::net::TcpStream, token: CancellationToken, ) { use tokio::io::{AsyncReadExt, AsyncWriteExt}; let mut buf = [0u8; 1024]; loop { tokio::select! { result = socket.read(&mut buf) => { match result { Ok(0) | Err(_) => break, Ok(n) => { let _ = socket.write_all(&buf[..n]).await; } } } _ = token.cancelled() => { println!("连接处理被取消"); break; } } } } ``` ### 5.6 等待所有任务完成 ```rust use tokio::net::TcpListener; use tokio::sync::mpsc; use tokio::task::JoinSet; #[tokio::main] async fn main() -> std::io::Result<()> { let listener = TcpListener::bind("127.0.0.1:8080").await?; let (shutdown_tx, mut shutdown_rx) = mpsc::channel::<()>(1); // 使用 JoinSet 跟踪所有任务 let mut join_set = JoinSet::new(); // 信号处理 let shutdown_tx_clone = shutdown_tx.clone(); tokio::spawn(async move { tokio::signal::ctrl_c().await.unwrap(); let _ = shutdown_tx_clone.send(()).await; }); loop { tokio::select! { result = listener.accept() => { let (socket, addr) = result?; println!("新连接: {}", addr); // 将任务添加到 JoinSet join_set.spawn(async move { handle_connection(socket).await; }); } _ = shutdown_rx.recv() => { println!("开始关闭..."); break; } } } // 等待所有任务完成 println!("等待 {} 个任务完成...", join_set.len()); while let Some(result) = join_set.join_next().await { match result { Ok(_) => println!("任务完成"), Err(e) => eprintln!("任务失败: {}", e), } } println!("所有任务已完成,服务器关闭"); Ok(()) } async fn handle_connection(mut socket: tokio::net::TcpStream) { use tokio::io::{AsyncReadExt, AsyncWriteExt}; let mut buf = [0u8; 1024]; loop { match socket.read(&mut buf).await { Ok(0) | Err(_) => break, Ok(n) => { let _ = socket.write_all(&buf[..n]).await; } } } } ``` --- ## 6. TCP 选项配置 ### 6.1 TcpSocket 概述 `TcpSocket` 提供了在连接建立之前配置套接字选项的能力: ```rust use tokio::net::TcpSocket; use std::io; #[tokio::main] async fn main() -> io::Result<()> { // 创建 IPv4 TCP 套接字 let socket = TcpSocket::new_v4()?; // 或创建 IPv6 套接字 // let socket = TcpSocket::new_v6()?; // 配置套接字选项 socket.set_reuseaddr(true)?; socket.set_reuseport(true)?; // Linux/BSD 特有 socket.set_nodelay(true)?; socket.set_keepalive(true)?; // 绑定并监听 socket.bind("127.0.0.1:8080".parse().unwrap())?; let listener = socket.listen(1024)?; Ok(()) } ``` ### 6.2 TCP_NODELAY(禁用 Nagle 算法) ```rust use tokio::net::{TcpSocket, TcpStream}; use std::io; // 方式1:通过 TcpSocket 设置 async fn connect_with_nodelay_socket() -> io::Result { let socket = TcpSocket::new_v4()?; socket.set_nodelay(true)?; socket.connect("127.0.0.1:8080".parse().unwrap()).await } // 方式2:在 TcpStream 上设置 async fn set_nodelay_on_stream() -> io::Result<()> { let stream = TcpStream::connect("127.0.0.1:8080").await?; // 禁用 Nagle 算法,数据立即发送 stream.set_nodelay(true)?; // 查询当前设置 let nodelay = stream.nodelay()?; println!("TCP_NODELAY: {}", nodelay); Ok(()) } ``` **Nagle 算法说明**: - **启用时(默认)**: 小数据包会被缓冲,等待更多数据或确认后再发送 - **禁用时**: 数据立即发送,适合实时应用(游戏、即时通讯) ### 6.3 SO_KEEPALIVE(TCP 保活) ```rust use tokio::net::TcpSocket; use std::io; async fn setup_keepalive() -> io::Result<()> { let socket = TcpSocket::new_v4()?; // 启用 TCP 保活 socket.set_keepalive(true)?; // 查询当前设置 let keepalive = socket.keepalive()?; println!("SO_KEEPALIVE: {}", keepalive); // 绑定并连接 let stream = socket.connect("127.0.0.1:8080".parse().unwrap()).await?; Ok(()) } ``` **保活机制说明**: - 操作系统会定期发送探测包检查连接是否存活 - 如果对端没有响应,连接会被标记为断开 - 具体的探测间隔和重试次数由系统配置决定 ### 6.4 SO_REUSEADDR(地址复用) ```rust use tokio::net::TcpSocket; use std::io; async fn bind_with_reuseaddr() -> io::Result<()> { let addr = "127.0.0.1:8080".parse().unwrap(); let socket = TcpSocket::new_v4()?; // 允许快速重新绑定地址 // 在 Berkeley 派生的系统上,可以快速重用 TIME_WAIT 状态的地址 // 注意:Windows 上的行为不同,可能导致"套接字劫持" #[cfg(not(windows))] socket.set_reuseaddr(true)?; socket.bind(addr)?; let listener = socket.listen(1024)?; println!("监听在 {}", listener.local_addr()?); Ok(()) } ``` ### 6.5 SO_REUSEPORT(端口复用) ```rust use tokio::net::TcpSocket; use std::io; #[cfg(any(target_os = "linux", target_os = "freebsd"))] async fn bind_with_reuseport() -> io::Result<()> { let addr = "127.0.0.1:8080".parse().unwrap(); let socket = TcpSocket::new_v4()?; // 允许多个套接字绑定到同一端口 // 内核会在它们之间进行负载均衡 socket.set_reuseport(true)?; socket.bind(addr)?; let listener = socket.listen(1024)?; Ok(()) } ``` ### 6.6 发送和接收缓冲区大小 ```rust use tokio::net::TcpSocket; use std::io; async fn configure_buffers() -> io::Result<()> { let socket = TcpSocket::new_v4()?; // 设置发送缓冲区大小 socket.set_send_buffer_size(65536)?; // 设置接收缓冲区大小 socket.set_recv_buffer_size(65536)?; // 查询当前设置 println!("发送缓冲区: {} 字节", socket.send_buffer_size()?); println!("接收缓冲区: {} 字节", socket.recv_buffer_size()?); Ok(()) } ``` ### 6.7 本地地址绑定 ```rust use tokio::net::TcpSocket; use std::io; async fn bind_to_specific_interface() -> io::Result<()> { let socket = TcpSocket::new_v4()?; // 绑定到特定本地地址(指定网络接口) socket.bind("192.168.1.100:0".parse().unwrap())?; // 连接到远程地址 // 连接将通过绑定的本地地址发起 let stream = socket.connect("93.184.216.34:80".parse().unwrap()).await?; println!("本地地址: {}", stream.local_addr()?); println!("远程地址: {}", stream.peer_addr()?); Ok(()) } ``` ### 6.8 完整的套接字配置示例 ```rust use tokio::net::TcpSocket; use std::io; async fn fully_configured_server() -> io::Result<()> { let addr = "0.0.0.0:8080".parse().unwrap(); let socket = TcpSocket::new_v4()?; // 地址复用 socket.set_reuseaddr(true)?; // 端口复用(Linux/BSD) #[cfg(any(target_os = "linux", target_os = "freebsd", target_os = "macos"))] socket.set_reuseport(true)?; // 禁用 Nagle 算法 socket.set_nodelay(true)?; // 启用保活 socket.set_keepalive(true)?; // 设置缓冲区 socket.set_send_buffer_size(262144)?; // 256KB socket.set_recv_buffer_size(262144)?; // 256KB // 绑定 socket.bind(addr)?; // 开始监听,backlog 设为 1024 let listener = socket.listen(1024)?; println!("服务器配置完成,监听在 {}", listener.local_addr()?); // 打印配置信息 println!("配置信息:"); println!(" 发送缓冲区: 262144 字节"); println!(" 接收缓冲区: 262144 字节"); println!(" TCP_NODELAY: true"); println!(" SO_KEEPALIVE: true"); println!(" SO_REUSEADDR: true"); loop { let (stream, addr) = listener.accept().await?; println!("新连接来自: {}", addr); // 在已建立的流上也可以修改某些选项 stream.set_nodelay(true)?; tokio::spawn(async move { // 处理连接 }); } } ``` ### 6.9 从标准库套接字转换 ```rust use tokio::net::{TcpListener, TcpStream}; use std::net::{TcpListener as StdTcpListener, TcpStream as StdTcpStream}; use std::io; async fn from_std_example() -> io::Result<()> { // 创建标准库 TcpListener let std_listener = StdTcpListener::bind("127.0.0.1:8080")?; // 设置为非阻塞模式(必需) std_listener.set_nonblocking(true)?; // 转换为 Tokio TcpListener let listener = TcpListener::from_std(std_listener)?; // 同样可以转换 TcpStream let std_stream = StdTcpStream::connect("127.0.0.1:8080")?; std_stream.set_nonblocking(true)?; let stream = TcpStream::from_std(std_stream)?; Ok(()) } ``` --- ## 7. 地址解析 lookup_host ### 7.1 基本用法 `tokio::net::lookup_host` 异步解析主机名到套接字地址: ```rust use tokio::net::lookup_host; use std::io; #[tokio::main] async fn main() -> io::Result<()> { // 解析主机名(需要包含端口) let addrs = lookup_host("www.google.com:443").await?; for addr in addrs { println!("解析到: {}", addr); } Ok(()) } ``` ### 7.2 解析并连接 ```rust use tokio::net::{lookup_host, TcpStream}; use std::io; async fn connect_by_hostname(host: &str, port: u16) -> io::Result { let addr_str = format!("{}:{}", host, port); // 获取所有解析结果 let addrs: Vec<_> = lookup_host(&addr_str).await?.collect(); if addrs.is_empty() { return Err(io::Error::new( io::ErrorKind::NotFound, format!("无法解析主机名: {}", host), )); } // 尝试连接第一个地址 TcpStream::connect(addrs[0]).await } ``` ### 7.3 带回退的连接 ```rust use tokio::net::{lookup_host, TcpStream}; use std::io; use std::net::SocketAddr; async fn connect_with_fallback(host: &str, port: u16) -> io::Result { let addr_str = format!("{}:{}", host, port); let addrs: Vec = lookup_host(&addr_str) .await? .collect(); let mut last_error = None; // 尝试所有解析到的地址 for addr in addrs { match TcpStream::connect(addr).await { Ok(stream) => { println!("成功连接到: {}", addr); return Ok(stream); } Err(e) => { println!("连接 {} 失败: {}", addr, e); last_error = Some(e); } } } Err(last_error.unwrap_or_else(|| { io::Error::new(io::ErrorKind::NotFound, "无可用地址") })) } ``` ### 7.4 区分 IPv4 和 IPv6 ```rust use tokio::net::lookup_host; use std::net::{IpAddr, SocketAddr}; use std::io; async fn resolve_with_preference(host: &str, port: u16, prefer_v6: bool) -> io::Result { let addr_str = format!("{}:{}", host, port); let addrs: Vec = lookup_host(&addr_str) .await? .collect(); // 分类地址 let (v4_addrs, v6_addrs): (Vec<_>, Vec<_>) = addrs .into_iter() .partition(|addr| addr.is_ipv4()); // 根据偏好选择 let preferred = if prefer_v6 { &v6_addrs } else { &v4_addrs }; let fallback = if prefer_v6 { &v4_addrs } else { &v6_addrs }; preferred .first() .or_else(|| fallback.first()) .copied() .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "无可用地址")) } // 只获取 IPv4 地址 async fn resolve_v4_only(host: &str, port: u16) -> io::Result> { let addr_str = format!("{}:{}", host, port); let addrs = lookup_host(&addr_str) .await? .filter(|addr| addr.is_ipv4()) .collect(); Ok(addrs) } ``` ### 7.5 带缓存的 DNS 解析器 ```rust use tokio::net::lookup_host; use tokio::sync::RwLock; use tokio::time::{Duration, Instant}; use std::collections::HashMap; use std::net::SocketAddr; use std::sync::Arc; use std::io; struct CachedResolver { cache: Arc>>, ttl: Duration, } struct CacheEntry { addrs: Vec, expires_at: Instant, } impl CachedResolver { fn new(ttl: Duration) -> Self { Self { cache: Arc::new(RwLock::new(HashMap::new())), ttl, } } async fn resolve(&self, host: &str, port: u16) -> io::Result> { let key = format!("{}:{}", host, port); let now = Instant::now(); // 检查缓存 { let cache = self.cache.read().await; if let Some(entry) = cache.get(&key) { if entry.expires_at > now { return Ok(entry.addrs.clone()); } } } // 缓存未命中或已过期,执行解析 let addrs: Vec = lookup_host(&key) .await? .collect(); // 更新缓存 { let mut cache = self.cache.write().await; cache.insert(key, CacheEntry { addrs: addrs.clone(), expires_at: now + self.ttl, }); } Ok(addrs) } // 启动后台清理任务 fn start_cleanup_task(&self) { let cache = self.cache.clone(); tokio::spawn(async move { let mut interval = tokio::time::interval(Duration::from_secs(60)); loop { interval.tick().await; let now = Instant::now(); let mut cache = cache.write().await; cache.retain(|_, entry| entry.expires_at > now); } }); } } // 使用示例 async fn use_cached_resolver() -> io::Result<()> { let resolver = CachedResolver::new(Duration::from_secs(300)); resolver.start_cleanup_task(); // 第一次解析(实际 DNS 查询) let addrs = resolver.resolve("www.example.com", 80).await?; println!("解析结果: {:?}", addrs); // 第二次解析(从缓存获取) let addrs = resolver.resolve("www.example.com", 80).await?; println!("缓存结果: {:?}", addrs); Ok(()) } ``` ### 7.6 处理 IP 地址直接输入 ```rust use tokio::net::lookup_host; use std::net::{IpAddr, SocketAddr}; use std::io; async fn smart_resolve(addr: &str, port: u16) -> io::Result { // 尝试直接解析为 IP 地址 if let Ok(ip) = addr.parse::() { return Ok(SocketAddr::new(ip, port)); } // 如果不是 IP,执行 DNS 解析 let addr_str = format!("{}:{}", addr, port); lookup_host(&addr_str) .await? .next() .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "无法解析地址")) } ``` ### 7.7 ToSocketAddrs trait `TcpStream::connect` 等方法接受实现了 `ToSocketAddrs` 的类型: ```rust use tokio::net::TcpStream; use std::net::SocketAddr; use std::io; async fn connect_examples() -> io::Result<()> { // 字符串形式(会自动解析) let _stream = TcpStream::connect("www.example.com:80").await?; // 元组形式 let _stream = TcpStream::connect(("www.example.com", 80)).await?; // SocketAddr 形式 let addr: SocketAddr = "127.0.0.1:8080".parse().unwrap(); let _stream = TcpStream::connect(addr).await?; // &[SocketAddr] 形式(会尝试所有地址) let addrs: &[SocketAddr] = &[ "127.0.0.1:8080".parse().unwrap(), "127.0.0.1:8081".parse().unwrap(), ]; let _stream = TcpStream::connect(addrs).await?; Ok(()) } ``` --- ## 8. 典型服务器模式 ### 8.1 基础 Accept Loop 模式 最简单的服务器模式: ```rust use tokio::net::TcpListener; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use std::io; #[tokio::main] async fn main() -> io::Result<()> { let listener = TcpListener::bind("127.0.0.1:8080").await?; println!("服务器启动在 127.0.0.1:8080"); loop { // 等待新连接 let (mut socket, addr) = listener.accept().await?; println!("新连接: {}", addr); // 为每个连接生成独立任务 tokio::spawn(async move { let mut buf = [0u8; 1024]; loop { match socket.read(&mut buf).await { Ok(0) => { println!("连接关闭: {}", addr); return; } Ok(n) => { // Echo 回去 if socket.write_all(&buf[..n]).await.is_err() { return; } } Err(e) => { eprintln!("读取错误: {}", e); return; } } } }); } } ``` ### 8.2 带连接限制的服务器 ```rust use tokio::net::TcpListener; use tokio::sync::Semaphore; use std::sync::Arc; use std::io; const MAX_CONNECTIONS: usize = 100; #[tokio::main] async fn main() -> io::Result<()> { let listener = TcpListener::bind("127.0.0.1:8080").await?; let semaphore = Arc::new(Semaphore::new(MAX_CONNECTIONS)); println!("服务器启动,最大连接数: {}", MAX_CONNECTIONS); loop { // 获取许可证(如果没有可用的,会等待) let permit = semaphore.clone().acquire_owned().await.unwrap(); let (socket, addr) = listener.accept().await?; println!("新连接: {} (当前连接数: {})", addr, MAX_CONNECTIONS - semaphore.available_permits()); tokio::spawn(async move { handle_connection(socket).await; // permit 在这里被 drop,释放一个槽位 drop(permit); }); } } async fn handle_connection(mut socket: tokio::net::TcpStream) { use tokio::io::{AsyncReadExt, AsyncWriteExt}; let mut buf = [0u8; 1024]; loop { match socket.read(&mut buf).await { Ok(0) | Err(_) => return, Ok(n) => { let _ = socket.write_all(&buf[..n]).await; } } } } ``` ### 8.3 带超时的连接处理 ```rust use tokio::net::TcpListener; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::time::{timeout, Duration}; use std::io; const READ_TIMEOUT: Duration = Duration::from_secs(30); const WRITE_TIMEOUT: Duration = Duration::from_secs(10); #[tokio::main] async fn main() -> io::Result<()> { let listener = TcpListener::bind("127.0.0.1:8080").await?; loop { let (socket, addr) = listener.accept().await?; tokio::spawn(async move { if let Err(e) = handle_with_timeout(socket).await { eprintln!("连接 {} 处理错误: {}", addr, e); } }); } } async fn handle_with_timeout(mut socket: tokio::net::TcpStream) -> io::Result<()> { let mut buf = [0u8; 1024]; loop { // 带超时的读取 let n = match timeout(READ_TIMEOUT, socket.read(&mut buf)).await { Ok(Ok(0)) => return Ok(()), // 连接关闭 Ok(Ok(n)) => n, Ok(Err(e)) => return Err(e), Err(_) => { return Err(io::Error::new( io::ErrorKind::TimedOut, "读取超时", )); } }; // 带超时的写入 match timeout(WRITE_TIMEOUT, socket.write_all(&buf[..n])).await { Ok(Ok(())) => {} Ok(Err(e)) => return Err(e), Err(_) => { return Err(io::Error::new( io::ErrorKind::TimedOut, "写入超时", )); } } } } ``` ### 8.4 共享状态服务器 ```rust use tokio::net::TcpListener; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::sync::{broadcast, RwLock}; use std::collections::HashMap; use std::sync::Arc; use std::io; type SharedState = Arc>>; #[tokio::main] async fn main() -> io::Result<()> { let listener = TcpListener::bind("127.0.0.1:8080").await?; let state: SharedState = Arc::new(RwLock::new(HashMap::new())); // 广播通道用于通知所有客户端 let (broadcast_tx, _) = broadcast::channel::(100); println!("键值服务器启动"); loop { let (socket, addr) = listener.accept().await?; let state = state.clone(); let broadcast_tx = broadcast_tx.clone(); let mut broadcast_rx = broadcast_tx.subscribe(); tokio::spawn(async move { let (reader, mut writer) = socket.into_split(); let mut reader = BufReader::new(reader); let mut line = String::new(); // 发送欢迎消息 let _ = writer.write_all(b"欢迎! 命令: GET key, SET key value, DEL key\n").await; loop { tokio::select! { // 处理客户端命令 result = reader.read_line(&mut line) => { match result { Ok(0) => break, // 连接关闭 Ok(_) => { let response = process_command(&line, &state, &broadcast_tx).await; let _ = writer.write_all(response.as_bytes()).await; line.clear(); } Err(_) => break, } } // 接收广播消息 result = broadcast_rx.recv() => { if let Ok(msg) = result { let _ = writer.write_all(format!("[广播] {}\n", msg).as_bytes()).await; } } } } println!("客户端 {} 断开", addr); }); } } async fn process_command( line: &str, state: &SharedState, broadcast_tx: &broadcast::Sender, ) -> String { let parts: Vec<&str> = line.trim().splitn(3, ' ').collect(); match parts.as_slice() { ["GET", key] => { let state = state.read().await; match state.get(*key) { Some(value) => format!("OK: {}\n", value), None => "NOT FOUND\n".to_string(), } } ["SET", key, value] => { let mut state = state.write().await; state.insert(key.to_string(), value.to_string()); let _ = broadcast_tx.send(format!("键 '{}' 已更新", key)); "OK\n".to_string() } ["DEL", key] => { let mut state = state.write().await; match state.remove(*key) { Some(_) => { let _ = broadcast_tx.send(format!("键 '{}' 已删除", key)); "DELETED\n".to_string() } None => "NOT FOUND\n".to_string(), } } _ => "ERROR: 无效命令\n".to_string(), } } ``` ### 8.5 分层处理器模式 ```rust use tokio::net::TcpListener; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use std::io; use std::future::Future; use std::pin::Pin; // 定义处理器 trait trait Handler: Send + Sync { fn handle<'a>( &'a self, request: &'a [u8], ) -> Pin> + Send + 'a>>; } // Echo 处理器 struct EchoHandler; impl Handler for EchoHandler { fn handle<'a>( &'a self, request: &'a [u8], ) -> Pin> + Send + 'a>> { Box::pin(async move { request.to_vec() }) } } // 大写转换处理器 struct UppercaseHandler; impl Handler for UppercaseHandler { fn handle<'a>( &'a self, request: &'a [u8], ) -> Pin> + Send + 'a>> { Box::pin(async move { request.to_ascii_uppercase() }) } } // 日志中间件 struct LoggingHandler { inner: H, } impl Handler for LoggingHandler { fn handle<'a>( &'a self, request: &'a [u8], ) -> Pin> + Send + 'a>> { Box::pin(async move { println!("收到请求: {} 字节", request.len()); let response = self.inner.handle(request).await; println!("发送响应: {} 字节", response.len()); response }) } } // 服务器 struct Server { handler: H, } impl Server { fn new(handler: H) -> Self { Self { handler } } async fn run(self, addr: &str) -> io::Result<()> { let listener = TcpListener::bind(addr).await?; let handler = std::sync::Arc::new(self.handler); loop { let (mut socket, _) = listener.accept().await?; let handler = handler.clone(); tokio::spawn(async move { let mut buf = [0u8; 1024]; loop { match socket.read(&mut buf).await { Ok(0) => return, Ok(n) => { let response = handler.handle(&buf[..n]).await; if socket.write_all(&response).await.is_err() { return; } } Err(_) => return, } } }); } } } #[tokio::main] async fn main() -> io::Result<()> { // 组合处理器 let handler = LoggingHandler { inner: UppercaseHandler, }; let server = Server::new(handler); server.run("127.0.0.1:8080").await } ``` ### 8.6 HTTP 风格的请求-响应服务器 ```rust use tokio::net::TcpListener; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use std::io; #[tokio::main] async fn main() -> io::Result<()> { let listener = TcpListener::bind("127.0.0.1:8080").await?; println!("HTTP 服务器启动在 http://127.0.0.1:8080"); loop { let (socket, addr) = listener.accept().await?; tokio::spawn(async move { if let Err(e) = handle_http(socket).await { eprintln!("处理 {} 错误: {}", addr, e); } }); } } async fn handle_http(socket: tokio::net::TcpStream) -> io::Result<()> { let (reader, mut writer) = socket.into_split(); let mut reader = BufReader::new(reader); // 读取请求行 let mut request_line = String::new(); reader.read_line(&mut request_line).await?; // 解析方法和路径 let parts: Vec<&str> = request_line.split_whitespace().collect(); let (method, path) = match parts.as_slice() { [method, path, ..] => (*method, *path), _ => return Err(io::Error::new(io::ErrorKind::InvalidData, "无效请求")), }; // 读取并忽略请求头 loop { let mut header = String::new(); reader.read_line(&mut header).await?; if header.trim().is_empty() { break; } } // 生成响应 let (status, body) = match (method, path) { ("GET", "/") => ("200 OK", "

Hello, World!

"), ("GET", "/api/status") => ("200 OK", r#"{"status": "ok"}"#), ("GET", _) => ("404 Not Found", "

404 Not Found

"), _ => ("405 Method Not Allowed", "

405 Method Not Allowed

"), }; // 发送响应 let response = format!( "HTTP/1.1 {}\r\n\ Content-Type: text/html; charset=utf-8\r\n\ Content-Length: {}\r\n\ Connection: close\r\n\ \r\n\ {}", status, body.len(), body ); writer.write_all(response.as_bytes()).await?; Ok(()) } ``` ### 8.7 多监听器服务器 ```rust use tokio::net::TcpListener; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use std::io; #[tokio::main] async fn main() -> io::Result<()> { // 绑定多个地址 let listener1 = TcpListener::bind("127.0.0.1:8080").await?; let listener2 = TcpListener::bind("127.0.0.1:8081").await?; println!("服务器监听在 :8080 和 :8081"); loop { tokio::select! { result = listener1.accept() => { let (socket, addr) = result?; println!("端口 8080 新连接: {}", addr); tokio::spawn(handle_connection(socket, "8080")); } result = listener2.accept() => { let (socket, addr) = result?; println!("端口 8081 新连接: {}", addr); tokio::spawn(handle_connection(socket, "8081")); } } } } async fn handle_connection(mut socket: tokio::net::TcpStream, port: &'static str) { let mut buf = [0u8; 1024]; // 发送端口信息 let greeting = format!("连接到端口 {}\n", port); let _ = socket.write_all(greeting.as_bytes()).await; loop { match socket.read(&mut buf).await { Ok(0) | Err(_) => return, Ok(n) => { let _ = socket.write_all(&buf[..n]).await; } } } } ``` ### 8.8 生产级服务器框架 ```rust use tokio::net::TcpListener; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::sync::{broadcast, Semaphore}; use tokio::time::{timeout, Duration}; use std::sync::Arc; use std::io; /// 服务器配置 pub struct ServerConfig { pub addr: String, pub max_connections: usize, pub read_timeout: Duration, pub write_timeout: Duration, } impl Default for ServerConfig { fn default() -> Self { Self { addr: "127.0.0.1:8080".to_string(), max_connections: 1000, read_timeout: Duration::from_secs(30), write_timeout: Duration::from_secs(10), } } } /// 生产级 TCP 服务器 pub struct TcpServer { config: ServerConfig, shutdown_tx: broadcast::Sender<()>, } impl TcpServer { pub fn new(config: ServerConfig) -> Self { let (shutdown_tx, _) = broadcast::channel(1); Self { config, shutdown_tx } } /// 获取关闭发送器 pub fn shutdown_handle(&self) -> broadcast::Sender<()> { self.shutdown_tx.clone() } /// 运行服务器 pub async fn run(&self, handler: H) -> io::Result<()> where H: Fn(tokio::net::TcpStream) -> F + Send + Sync + 'static, F: std::future::Future + Send + 'static, { let listener = TcpListener::bind(&self.config.addr).await?; let semaphore = Arc::new(Semaphore::new(self.config.max_connections)); let handler = Arc::new(handler); let mut shutdown_rx = self.shutdown_tx.subscribe(); println!("服务器启动: {}", self.config.addr); println!("最大连接数: {}", self.config.max_connections); loop { tokio::select! { result = listener.accept() => { let (socket, addr) = result?; // 获取连接许可 let permit = match semaphore.clone().try_acquire_owned() { Ok(permit) => permit, Err(_) => { eprintln!("连接数已满,拒绝: {}", addr); continue; } }; let handler = handler.clone(); let mut shutdown_rx = self.shutdown_tx.subscribe(); let read_timeout = self.config.read_timeout; tokio::spawn(async move { println!("新连接: {}", addr); tokio::select! { _ = handler(socket) => {} _ = shutdown_rx.recv() => { println!("连接 {} 收到关闭信号", addr); } } println!("连接关闭: {}", addr); drop(permit); }); } _ = shutdown_rx.recv() => { println!("服务器收到关闭信号"); break; } } } // 等待所有连接完成 println!("等待现有连接完成..."); let _ = semaphore.acquire_many(self.config.max_connections as u32).await; println!("服务器已关闭"); Ok(()) } } // 使用示例 #[tokio::main] async fn main() -> io::Result<()> { let config = ServerConfig::default(); let server = TcpServer::new(config); // 设置关闭处理 let shutdown_handle = server.shutdown_handle(); tokio::spawn(async move { tokio::signal::ctrl_c().await.unwrap(); println!("收到 Ctrl+C"); let _ = shutdown_handle.send(()); }); // 运行服务器 server.run(|mut socket| async move { let mut buf = [0u8; 1024]; loop { match socket.read(&mut buf).await { Ok(0) | Err(_) => return, Ok(n) => { if socket.write_all(&buf[..n]).await.is_err() { return; } } } } }).await } ``` --- ## 附录 A:常用 Cargo 依赖 ```toml [dependencies] tokio = { version = "1", features = ["full"] } tokio-util = { version = "0.7", features = ["codec", "rt"] } tokio-stream = "0.1" # 可选依赖 bytes = "1" # 高效字节缓冲区 pin-project = "1" # Pin 投影宏 async-trait = "0.1" # 异步 trait 支持 futures = "0.3" # Future 扩展 tracing = "0.1" # 日志/追踪 ``` ## 附录 B:Feature Flags ```toml [dependencies.tokio] version = "1" features = [ "rt", # 运行时核心 "rt-multi-thread", # 多线程运行时 "net", # 网络类型 (TcpListener, TcpStream, UdpSocket 等) "io-util", # AsyncReadExt, AsyncWriteExt 等 "io-std", # 标准输入输出 "time", # 定时器 "sync", # 同步原语 "signal", # 信号处理 "macros", # #[tokio::main], #[tokio::test] 等 "fs", # 文件系统 ] ``` ## 附录 C:错误处理最佳实践 ```rust use std::io; use thiserror::Error; #[derive(Error, Debug)] pub enum ServerError { #[error("IO 错误: {0}")] Io(#[from] io::Error), #[error("连接超时")] Timeout, #[error("连接数已满")] TooManyConnections, #[error("协议错误: {0}")] Protocol(String), } // 使用示例 async fn handle_with_errors(socket: tokio::net::TcpStream) -> Result<(), ServerError> { use tokio::io::AsyncReadExt; use tokio::time::{timeout, Duration}; let mut socket = socket; let mut buf = [0u8; 1024]; // 带超时的读取 let n = timeout(Duration::from_secs(30), socket.read(&mut buf)) .await .map_err(|_| ServerError::Timeout)? .map_err(ServerError::Io)?; if n == 0 { return Ok(()); } // 验证协议 if buf[0] != 0x01 { return Err(ServerError::Protocol("无效的协议头".to_string())); } Ok(()) } ``` --- ## 总结 本文档详细介绍了 Tokio 网络模块的核心组件和使用模式: 1. **TcpListener/TcpStream**: TCP 服务器和客户端的基础 2. **UdpSocket**: 无连接的数据报通信 3. **UnixListener/UnixStream**: Unix 域套接字用于高效 IPC 4. **连接池**: 复用连接提高性能 5. **优雅关闭**: 确保服务正确终止 6. **TCP 选项**: 细粒度配置网络行为 7. **地址解析**: 异步 DNS 查询 8. **服务器模式**: 从简单到生产级的实现 这些知识涵盖了构建高性能异步网络应用所需的全部基础。