# Tokio 生态系统深度分析:tokio-util codec 与 tokio-stream 本文档深入分析 Tokio 生态系统中两个核心组件:tokio-util 的 codec 模块和 tokio-stream。 适合作为 LLM 参考文档使用,包含详细的代码示例、调用链路和内部原理解释。 --- ## 第一部分:tokio-util codec tokio-util 的 codec 模块提供了一套完整的帧编解码框架,用于将原始字节流转换为有意义的消息帧。 这是实现网络协议的基础设施。 ### 1. Encoder 和 Decoder Trait #### 1.1 Decoder Trait 定义与原理 Decoder trait 用于将字节流解码为帧(frames)。其核心设计理念是: - 从缓冲区中尝试提取完整的帧 - 如果数据不足,返回 `Ok(None)` 等待更多数据 - 如果数据格式错误,返回错误 ```rust use bytes::BytesMut; use std::io; /// 帧解码器 trait /// 用于将字节缓冲区解码为帧 pub trait Decoder { /// 解码后的帧类型 type Item; /// 解码错误类型 /// 必须实现 From,以便 FramedRead 可以直接返回 IO 错误 type Error: From; /// 尝试从缓冲区解码一个帧 /// /// 返回值: /// - Ok(Some(frame)): 成功解码一个完整的帧 /// - Ok(None): 数据不足,需要等待更多数据 /// - Err(e): 数据格式错误,流已损坏 fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error>; /// 当底层 I/O 到达 EOF 时调用 /// 默认实现调用 decode,如果缓冲区非空则返回错误 fn decode_eof(&mut self, buf: &mut BytesMut) -> Result, Self::Error> { match self.decode(buf)? { Some(frame) => Ok(Some(frame)), None => { if buf.is_empty() { Ok(None) } else { Err(io::Error::new( io::ErrorKind::UnexpectedEof, "bytes remaining in buffer" ).into()) } } } } /// 便捷方法:将 I/O 对象包装为 Framed fn framed(self, io: T) -> Framed where Self: Sized, { Framed::new(io, self) } } ``` **调用链路说明**: 1. `FramedRead` 从底层 `AsyncRead` 读取数据到内部缓冲区 2. 调用 `Decoder::decode()` 尝试解码 3. 如果返回 `Ok(None)`,继续读取更多数据 4. 如果返回 `Ok(Some(frame))`,将帧返回给调用者 5. 当 EOF 时,调用 `decode_eof()` 处理剩余数据 #### 1.2 Encoder Trait 定义与原理 Encoder trait 用于将消息编码为字节: ```rust use bytes::BytesMut; use std::io; /// 帧编码器 trait /// 用于将消息编码为字节 pub trait Encoder { /// 编码错误类型 /// 必须实现 From type Error: From; /// 将一个消息编码到缓冲区 /// /// 参数: /// - item: 要编码的消息 /// - dst: 目标缓冲区(FramedWrite 的内部缓冲区) fn encode(&mut self, item: Item, dst: &mut BytesMut) -> Result<(), Self::Error>; } ``` **编码流程**: 1. 调用者通过 `Sink::send()` 发送消息 2. `FramedWrite` 调用 `Encoder::encode()` 将消息编码到内部缓冲区 3. 缓冲区数据写入底层 `AsyncWrite` --- ### 2. Framed、FramedRead 和 FramedWrite #### 2.1 Framed 结构体 `Framed` 是一个统一的 `Stream + Sink` 接口,同时支持读写: ```rust use tokio::io::{AsyncRead, AsyncWrite}; use tokio_util::codec::{Framed, LengthDelimitedCodec}; use futures::{StreamExt, SinkExt}; use bytes::Bytes; // 基本使用示例 async fn framed_example(io: T) -> Result<(), Box> where T: AsyncRead + AsyncWrite + Unpin, { // 创建 Framed 实例 let mut framed = Framed::new(io, LengthDelimitedCodec::new()); // 作为 Sink 发送数据 let frame = Bytes::from("hello world"); framed.send(frame).await?; // 作为 Stream 接收数据 while let Some(result) = framed.next().await { match result { Ok(data) => println!("Received: {:?}", data), Err(e) => eprintln!("Error: {}", e), } } Ok(()) } ``` **Framed 内部结构**: ```rust /// Framed 的简化内部结构 pub struct Framed { /// 底层 I/O 对象 inner: T, /// 编解码器 codec: U, /// 读缓冲区 read_buf: BytesMut, /// 写缓冲区 write_buf: BytesMut, /// 是否已到达 EOF eof: bool, /// 当前是否正在刷新 is_readable: bool, } ``` #### 2.2 FramedRead - 只读帧流 当只需要读取时使用 `FramedRead`: ```rust use tokio::fs::File; use tokio::io::AsyncRead; use tokio_util::codec::{FramedRead, BytesCodec, LinesCodec}; use tokio_stream::StreamExt; // 从文件读取原始字节 async fn read_bytes_from_file() -> Result<(), std::io::Error> { let file = File::open("data.bin").await?; let mut reader = FramedRead::new(file, BytesCodec::new()); while let Some(result) = reader.next().await { match result { Ok(bytes) => println!("Read {} bytes", bytes.len()), Err(e) => eprintln!("Error: {}", e), } } Ok(()) } // 逐行读取文本文件 async fn read_lines_from_file() -> Result<(), Box> { let file = File::open("text.txt").await?; let mut reader = FramedRead::new(file, LinesCodec::new()); while let Some(result) = reader.next().await { let line = result?; println!("Line: {}", line); } Ok(()) } ``` **FramedRead 的 Stream 实现原理**: ```rust impl Stream for FramedRead where T: AsyncRead + Unpin, D: Decoder, { type Item = Result; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { // 简化的实现逻辑 loop { // 1. 尝试从缓冲区解码 if let Some(frame) = self.codec.decode(&mut self.read_buf)? { return Poll::Ready(Some(Ok(frame))); } // 2. 如果已到 EOF if self.eof { return match self.codec.decode_eof(&mut self.read_buf)? { Some(frame) => Poll::Ready(Some(Ok(frame))), None => Poll::Ready(None), }; } // 3. 从底层 I/O 读取更多数据 let n = ready!(Pin::new(&mut self.inner).poll_read(cx, &mut self.read_buf)?); if n == 0 { self.eof = true; } } } } ``` #### 2.3 FramedWrite - 只写帧接收器 当只需要写入时使用 `FramedWrite`: ```rust use tokio::io::AsyncWrite; use tokio_util::codec::{FramedWrite, LinesCodec}; use futures::SinkExt; async fn write_lines(io: T, messages: Vec<&str>) -> Result<(), Box> where T: AsyncWrite + Unpin, { let mut writer = FramedWrite::new(io, LinesCodec::new()); for msg in messages { writer.send(msg.to_string()).await?; } // 确保所有数据都已写入 writer.flush().await?; Ok(()) } ``` #### 2.4 分离读写:split() 方法 可以将 `Framed` 分离为独立的读写部分: ```rust use tokio::net::TcpStream; use tokio_util::codec::{Framed, LinesCodec}; use futures::{StreamExt, SinkExt}; async fn split_example(stream: TcpStream) -> Result<(), Box> { let framed = Framed::new(stream, LinesCodec::new()); // 分离为读和写两部分 let (mut writer, mut reader) = framed.split(); // 可以在不同的任务中使用 let read_task = tokio::spawn(async move { while let Some(result) = reader.next().await { if let Ok(line) = result { println!("Received: {}", line); } } }); let write_task = tokio::spawn(async move { writer.send("Hello".to_string()).await.ok(); writer.send("World".to_string()).await.ok(); }); let _ = tokio::join!(read_task, write_task); Ok(()) } ``` --- ### 3. 常用编解码器 #### 3.1 BytesCodec - 原始字节传输 `BytesCodec` 是最简单的编解码器,直接传输原始字节: ```rust use tokio_util::codec::{BytesCodec, Decoder, Encoder}; use bytes::{Bytes, BytesMut, BufMut}; use std::io; /// BytesCodec 的完整实现 #[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Default)] pub struct BytesCodec(()); impl BytesCodec { pub fn new() -> BytesCodec { BytesCodec(()) } } impl Decoder for BytesCodec { type Item = BytesMut; type Error = io::Error; fn decode(&mut self, buf: &mut BytesMut) -> Result, io::Error> { // 如果缓冲区非空,返回所有可用数据 if !buf.is_empty() { let len = buf.len(); Ok(Some(buf.split_to(len))) } else { Ok(None) } } } impl Encoder for BytesCodec { type Error = io::Error; fn encode(&mut self, data: Bytes, buf: &mut BytesMut) -> Result<(), io::Error> { buf.reserve(data.len()); buf.put(data); Ok(()) } } impl Encoder for BytesCodec { type Error = io::Error; fn encode(&mut self, data: BytesMut, buf: &mut BytesMut) -> Result<(), io::Error> { buf.reserve(data.len()); buf.put(data); Ok(()) } } ``` **使用场景**: - 原始数据流传输 - 作为其他编解码器的基础 - 文件 I/O 操作 ```rust use tokio::fs::File; use tokio_util::codec::{FramedRead, BytesCodec}; use tokio_stream::StreamExt; async fn stream_file() -> Result<(), std::io::Error> { let file = File::open("large_file.bin").await?; let mut stream = FramedRead::new(file, BytesCodec::new()); let mut total_bytes = 0; while let Some(result) = stream.next().await { let chunk = result?; total_bytes += chunk.len(); // 处理数据块... } println!("Total bytes read: {}", total_bytes); Ok(()) } ``` #### 3.2 LinesCodec - 行文本协议 `LinesCodec` 按行分割文本,使用 `\n` 作为分隔符: ```rust use tokio_util::codec::{Decoder, Encoder, LinesCodec}; use bytes::{Buf, BufMut, BytesMut}; use std::{cmp, io, str}; /// LinesCodec 的核心实现 #[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)] pub struct LinesCodec { // 下次搜索 \n 的起始位置(优化用) next_index: usize, // 最大行长度限制 max_length: usize, // 是否正在丢弃超长行 is_discarding: bool, } impl LinesCodec { /// 创建无长度限制的 LinesCodec /// 警告:对于不可信输入,应使用 new_with_max_length pub fn new() -> LinesCodec { LinesCodec { next_index: 0, max_length: usize::MAX, is_discarding: false, } } /// 创建有最大行长度限制的 LinesCodec /// 推荐用于处理不可信输入,防止内存耗尽攻击 pub fn new_with_max_length(max_length: usize) -> Self { LinesCodec { max_length, ..LinesCodec::new() } } pub fn max_length(&self) -> usize { self.max_length } } impl Decoder for LinesCodec { type Item = String; type Error = LinesCodecError; fn decode(&mut self, buf: &mut BytesMut) -> Result, LinesCodecError> { loop { // 确定搜索范围 let read_to = cmp::min(self.max_length.saturating_add(1), buf.len()); // 从 next_index 开始搜索换行符 let newline_offset = buf[self.next_index..read_to] .iter() .position(|b| *b == b'\n'); match (self.is_discarding, newline_offset) { // 正在丢弃且找到换行符:停止丢弃 (true, Some(offset)) => { buf.advance(offset + self.next_index + 1); self.is_discarding = false; self.next_index = 0; } // 正在丢弃但未找到换行符:继续丢弃 (true, None) => { buf.advance(read_to); self.next_index = 0; if buf.is_empty() { return Ok(None); } } // 未丢弃且找到换行符:返回行 (false, Some(offset)) => { let newline_index = self.next_index + offset; self.next_index = 0; let line = buf.split_to(newline_index + 1); let line = &line[..line.len() - 1]; // 处理 \r\n let line = if line.last() == Some(&b'\r') { &line[..line.len() - 1] } else { line }; let line = str::from_utf8(line) .map_err(|_| io::Error::new( io::ErrorKind::InvalidData, "Invalid UTF-8" ))?; return Ok(Some(line.to_string())); } // 未丢弃且未找到换行符 (false, None) => { if buf.len() > self.max_length { // 超过最大长度,开始丢弃 self.is_discarding = true; return Err(LinesCodecError::MaxLineLengthExceeded); } // 记住搜索位置,等待更多数据 self.next_index = buf.len(); return Ok(None); } } } } } impl Encoder for LinesCodec { type Error = LinesCodecError; fn encode(&mut self, line: String, buf: &mut BytesMut) -> Result<(), LinesCodecError> { buf.reserve(line.len() + 1); buf.put(line.as_bytes()); buf.put_u8(b'\n'); Ok(()) } } #[derive(Debug)] pub enum LinesCodecError { MaxLineLengthExceeded, Io(io::Error), } ``` **实际使用示例**: ```rust use tokio::net::TcpListener; use tokio_util::codec::{Framed, LinesCodec}; use futures::{StreamExt, SinkExt}; // 简单的 Echo 服务器 async fn echo_server() -> Result<(), Box> { let listener = TcpListener::bind("127.0.0.1:8080").await?; loop { let (socket, addr) = listener.accept().await?; println!("New connection from: {}", addr); tokio::spawn(async move { // 使用带长度限制的 LinesCodec 防止攻击 let codec = LinesCodec::new_with_max_length(1024); let mut framed = Framed::new(socket, codec); while let Some(result) = framed.next().await { match result { Ok(line) => { println!("Received: {}", line); // Echo 回去 if framed.send(line).await.is_err() { break; } } Err(e) => { eprintln!("Error: {:?}", e); break; } } } }); } } ``` #### 3.3 LengthDelimitedCodec - 长度前缀协议 `LengthDelimitedCodec` 实现了长度前缀帧协议,是最常用的二进制协议编解码器: ```rust use tokio_util::codec::LengthDelimitedCodec; use tokio::io::{AsyncRead, AsyncWrite}; use tokio_util::codec::{Framed, FramedRead, FramedWrite}; // 基本使用:4字节大端长度 + 数据 fn basic_length_delimited(io: T) -> Framed { Framed::new(io, LengthDelimitedCodec::new()) } ``` **Builder 模式配置**: ```rust use tokio_util::codec::LengthDelimitedCodec; use tokio::io::AsyncRead; use tokio_stream::StreamExt; // 配置详解 async fn length_delimited_examples() { // 示例1:基本配置 - 2字节长度字段 // 帧格式: [长度: 2字节][数据: N字节] // 输入: \x00\x0BHello world // 解析: 长度=11, 数据="Hello world" let io: &[u8] = b"\x00\x0BHello world"; let mut reader = LengthDelimitedCodec::builder() .length_field_offset(0) // 长度字段从第0字节开始 .length_field_type::() // 长度字段为 u16 (2字节) .length_adjustment(0) // 长度值无需调整 .num_skip(0) // 不跳过任何字节(保留长度字段) .new_read(io); // 示例2:跳过长度字段头部 // 帧格式: [长度: 2字节][数据: N字节] // 输入: \x00\x0BHello world // 长度=11指数据部分长度 // 跳过2字节长度字段,只返回数据 let io: &[u8] = b"\x00\x0BHello world"; let mut reader = LengthDelimitedCodec::builder() .length_field_offset(0) .length_field_type::() .length_adjustment(0) .num_skip(2) // 跳过2字节长度字段 .new_read(io); // 返回: "Hello world" // 示例3:长度字段包含头部大小 // 帧格式: [长度: 2字节][数据: N字节] // 长度=13 表示整个帧长度(包括2字节长度字段本身) let io: &[u8] = b"\x00\x0DHello world"; let mut reader = LengthDelimitedCodec::builder() .length_field_offset(0) .length_field_type::() .length_adjustment(-2) // 长度值减2得到数据长度 .num_skip(2) .new_read(io); // 示例4:有头部的协议 // 帧格式: [HDR1: 1字节][长度: 2字节][HDR2: 1字节][数据: N字节] // 长度字段位于偏移1处 let io: &[u8] = b"\xCA\x00\x0F\xFEHello world"; let mut reader = LengthDelimitedCodec::builder() .length_field_offset(1) // 长度字段从第1字节开始 .length_field_type::() // 2字节长度 .length_adjustment(-3) // 减去 hdr1 + len 的长度 .num_skip(3) // 跳过 hdr1 + len .new_read(io); // 返回: "\xFEHello world" (保留 HDR2) // 示例5:设置最大帧长度 let codec = LengthDelimitedCodec::builder() .max_frame_length(1024 * 1024) // 最大1MB .new_codec(); // 示例6:小端字节序 let codec = LengthDelimitedCodec::builder() .little_endian() .new_codec(); // 示例7:原生字节序(用于本地 IPC) let codec = LengthDelimitedCodec::builder() .native_endian() .new_codec(); } ``` **真实项目示例(来自 bitwarden/clients)**: ```rust use tokio_util::codec::LengthDelimitedCodec; use tokio::io::{stdin, stdout}; const NATIVE_MESSAGING_BUFFER_SIZE: usize = 1024 * 1024; // 1MB async fn native_messaging_proxy() { // Chrome Native Messaging 使用原生字节序的4字节长度前缀 let mut stdin_reader = LengthDelimitedCodec::builder() .max_frame_length(NATIVE_MESSAGING_BUFFER_SIZE) .native_endian() .new_read(tokio::io::stdin()); let mut stdout_writer = LengthDelimitedCodec::builder() .max_frame_length(NATIVE_MESSAGING_BUFFER_SIZE) .native_endian() .new_write(tokio::io::stdout()); // 处理消息... } ``` **真实项目示例(来自 google/tarpc)**: ```rust use tokio::net::TcpStream; use tokio_util::codec::LengthDelimitedCodec; use tokio_serde::formats::Bincode; // RPC 传输层配置 async fn create_rpc_transport(addr: &str) { let stream = TcpStream::connect(addr).await.unwrap(); // 使用 LengthDelimited 进行帧分割 let length_delimited = LengthDelimitedCodec::builder() .length_field_length(4) .new_framed(stream); // 再套一层序列化 let transport = tokio_serde::Framed::new( length_delimited, Bincode::default(), ); } ``` --- ### 4. 自定义编解码器实现 #### 4.1 简单自定义协议 实现一个简单的消息协议:`[type: 1字节][length: 2字节][payload: N字节]` ```rust use tokio_util::codec::{Decoder, Encoder}; use bytes::{Buf, BufMut, BytesMut}; use std::io; /// 消息类型 #[derive(Debug, Clone, PartialEq)] pub enum MessageType { Request, Response, Notification, } impl MessageType { fn from_byte(b: u8) -> Option { match b { 0 => Some(MessageType::Request), 1 => Some(MessageType::Response), 2 => Some(MessageType::Notification), _ => None, } } fn to_byte(&self) -> u8 { match self { MessageType::Request => 0, MessageType::Response => 1, MessageType::Notification => 2, } } } /// 消息结构 #[derive(Debug, Clone)] pub struct Message { pub msg_type: MessageType, pub payload: Vec, } /// 自定义编解码器 pub struct MessageCodec; impl MessageCodec { pub fn new() -> Self { MessageCodec } } impl Decoder for MessageCodec { type Item = Message; type Error = io::Error; fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { // 至少需要3字节头部 (type + length) if src.len() < 3 { return Ok(None); } // 读取头部但不消费 let msg_type_byte = src[0]; let length = u16::from_be_bytes([src[1], src[2]]) as usize; // 检查是否有完整的消息 let total_length = 3 + length; if src.len() < total_length { // 预留足够空间 src.reserve(total_length - src.len()); return Ok(None); } // 解析消息类型 let msg_type = MessageType::from_byte(msg_type_byte) .ok_or_else(|| io::Error::new( io::ErrorKind::InvalidData, format!("Invalid message type: {}", msg_type_byte) ))?; // 消费头部 src.advance(3); // 提取 payload let payload = src.split_to(length).to_vec(); Ok(Some(Message { msg_type, payload })) } } impl Encoder for MessageCodec { type Error = io::Error; fn encode(&mut self, item: Message, dst: &mut BytesMut) -> Result<(), Self::Error> { let payload_len = item.payload.len(); // 检查长度是否超过 u16 范围 if payload_len > u16::MAX as usize { return Err(io::Error::new( io::ErrorKind::InvalidInput, "Payload too large" )); } // 预留空间 dst.reserve(3 + payload_len); // 写入头部 dst.put_u8(item.msg_type.to_byte()); dst.put_u16(payload_len as u16); // 写入 payload dst.put_slice(&item.payload); Ok(()) } } // 使用示例 use tokio::net::TcpStream; use tokio_util::codec::Framed; use futures::{StreamExt, SinkExt}; async fn use_custom_codec(stream: TcpStream) -> io::Result<()> { let mut framed = Framed::new(stream, MessageCodec::new()); // 发送消息 let msg = Message { msg_type: MessageType::Request, payload: b"Hello, World!".to_vec(), }; framed.send(msg).await?; // 接收消息 while let Some(result) = framed.next().await { let msg = result?; println!("Received {:?}: {:?}", msg.msg_type, msg.payload); } Ok(()) } ``` #### 4.2 带状态的编解码器 实现一个 JSON-RPC 风格的协议,需要追踪请求 ID: ```rust use tokio_util::codec::{Decoder, Encoder}; use bytes::{Buf, BufMut, BytesMut}; use std::io; use std::collections::HashMap; #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct JsonRpcRequest { pub jsonrpc: String, pub id: u64, pub method: String, pub params: serde_json::Value, } #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct JsonRpcResponse { pub jsonrpc: String, pub id: u64, #[serde(skip_serializing_if = "Option::is_none")] pub result: Option, #[serde(skip_serializing_if = "Option::is_none")] pub error: Option, } #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct JsonRpcError { pub code: i32, pub message: String, } /// 带状态的 JSON-RPC 编解码器 pub struct JsonRpcCodec { /// 下一个请求 ID next_id: u64, /// 待处理的请求(用于客户端追踪) pending_requests: HashMap, /// 最大消息大小 max_size: usize, } impl JsonRpcCodec { pub fn new() -> Self { Self { next_id: 1, pending_requests: HashMap::new(), max_size: 1024 * 1024, // 1MB } } pub fn with_max_size(max_size: usize) -> Self { Self { max_size, ..Self::new() } } /// 生成下一个请求 ID pub fn next_request_id(&mut self) -> u64 { let id = self.next_id; self.next_id += 1; id } /// 记录待处理请求 pub fn track_request(&mut self, id: u64, method: String) { self.pending_requests.insert(id, method); } } #[derive(Debug)] pub enum JsonRpcMessage { Request(JsonRpcRequest), Response(JsonRpcResponse), } impl Decoder for JsonRpcCodec { type Item = JsonRpcMessage; type Error = io::Error; fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { // 查找换行符(JSON-RPC over stdio 通常用换行分隔) let newline_pos = src.iter().position(|&b| b == b'\n'); match newline_pos { Some(pos) => { if pos > self.max_size { return Err(io::Error::new( io::ErrorKind::InvalidData, "Message too large" )); } let line = src.split_to(pos); src.advance(1); // 跳过换行符 // 尝试解析为请求或响应 let json_str = std::str::from_utf8(&line) .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; // 先尝试解析为请求 if let Ok(req) = serde_json::from_str::(json_str) { return Ok(Some(JsonRpcMessage::Request(req))); } // 再尝试解析为响应 if let Ok(resp) = serde_json::from_str::(json_str) { // 从待处理列表中移除 self.pending_requests.remove(&resp.id); return Ok(Some(JsonRpcMessage::Response(resp))); } Err(io::Error::new( io::ErrorKind::InvalidData, "Invalid JSON-RPC message" )) } None => { if src.len() > self.max_size { return Err(io::Error::new( io::ErrorKind::InvalidData, "Message too large" )); } Ok(None) } } } } impl Encoder for JsonRpcCodec { type Error = io::Error; fn encode(&mut self, item: JsonRpcMessage, dst: &mut BytesMut) -> Result<(), Self::Error> { let json = match &item { JsonRpcMessage::Request(req) => { self.track_request(req.id, req.method.clone()); serde_json::to_string(req) } JsonRpcMessage::Response(resp) => { serde_json::to_string(resp) } }.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; dst.reserve(json.len() + 1); dst.put_slice(json.as_bytes()); dst.put_u8(b'\n'); Ok(()) } } ``` --- ### 5. 协议解析模式 #### 5.1 状态机模式 对于复杂协议,使用状态机管理解析状态: ```rust use tokio_util::codec::Decoder; use bytes::{Buf, BytesMut}; use std::io; /// HTTP 请求(简化版) #[derive(Debug, Clone)] pub struct HttpRequest { pub method: String, pub path: String, pub headers: Vec<(String, String)>, pub body: Vec, } /// 解析状态 enum HttpDecodeState { /// 等待请求行 RequestLine, /// 解析头部 Headers { method: String, path: String, headers: Vec<(String, String)>, }, /// 读取 body Body { method: String, path: String, headers: Vec<(String, String)>, content_length: usize, }, } /// HTTP 请求解码器 pub struct HttpRequestDecoder { state: HttpDecodeState, max_header_size: usize, max_body_size: usize, } impl HttpRequestDecoder { pub fn new() -> Self { Self { state: HttpDecodeState::RequestLine, max_header_size: 8 * 1024, // 8KB max_body_size: 10 * 1024 * 1024, // 10MB } } fn find_line(&self, buf: &BytesMut) -> Option { buf.windows(2).position(|w| w == b"\r\n") } fn parse_request_line(&self, line: &str) -> io::Result<(String, String)> { let parts: Vec<&str> = line.split_whitespace().collect(); if parts.len() < 2 { return Err(io::Error::new( io::ErrorKind::InvalidData, "Invalid request line" )); } Ok((parts[0].to_string(), parts[1].to_string())) } fn parse_header(&self, line: &str) -> io::Result<(String, String)> { let pos = line.find(':') .ok_or_else(|| io::Error::new( io::ErrorKind::InvalidData, "Invalid header" ))?; let name = line[..pos].trim().to_string(); let value = line[pos + 1..].trim().to_string(); Ok((name, value)) } } impl Decoder for HttpRequestDecoder { type Item = HttpRequest; type Error = io::Error; fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { loop { match &mut self.state { HttpDecodeState::RequestLine => { if let Some(pos) = self.find_line(src) { let line = src.split_to(pos); src.advance(2); // 跳过 \r\n let line_str = std::str::from_utf8(&line) .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; let (method, path) = self.parse_request_line(line_str)?; self.state = HttpDecodeState::Headers { method, path, headers: Vec::new(), }; } else { return Ok(None); } } HttpDecodeState::Headers { method, path, headers } => { if let Some(pos) = self.find_line(src) { if pos == 0 { // 空行,头部结束 src.advance(2); // 查找 Content-Length let content_length = headers .iter() .find(|(name, _)| name.eq_ignore_ascii_case("content-length")) .and_then(|(_, value)| value.parse::().ok()) .unwrap_or(0); if content_length == 0 { // 无 body,返回请求 let request = HttpRequest { method: std::mem::take(method), path: std::mem::take(path), headers: std::mem::take(headers), body: Vec::new(), }; self.state = HttpDecodeState::RequestLine; return Ok(Some(request)); } if content_length > self.max_body_size { return Err(io::Error::new( io::ErrorKind::InvalidData, "Body too large" )); } self.state = HttpDecodeState::Body { method: std::mem::take(method), path: std::mem::take(path), headers: std::mem::take(headers), content_length, }; } else { // 解析头部 let line = src.split_to(pos); src.advance(2); let line_str = std::str::from_utf8(&line) .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; let header = self.parse_header(line_str)?; headers.push(header); } } else { return Ok(None); } } HttpDecodeState::Body { method, path, headers, content_length } => { if src.len() >= *content_length { let body = src.split_to(*content_length).to_vec(); let request = HttpRequest { method: std::mem::take(method), path: std::mem::take(path), headers: std::mem::take(headers), body, }; self.state = HttpDecodeState::RequestLine; return Ok(Some(request)); } else { // 预留空间 src.reserve(*content_length - src.len()); return Ok(None); } } } } } } ``` #### 5.2 组合编解码器模式 将多个编解码器组合使用: ```rust use tokio_util::codec::{Framed, LengthDelimitedCodec}; use tokio_serde::formats::{Bincode, Json, MessagePack}; use tokio_serde::Framed as SerdeFramed; use serde::{Serialize, Deserialize}; #[derive(Debug, Serialize, Deserialize)] pub struct MyMessage { pub id: u64, pub content: String, } // 组合 LengthDelimited + Bincode async fn bincode_transport(io: T) -> SerdeFramed< Framed, MyMessage, MyMessage, Bincode, > where T: tokio::io::AsyncRead + tokio::io::AsyncWrite, { let length_delimited = LengthDelimitedCodec::builder() .length_field_length(4) .new_framed(io); SerdeFramed::new(length_delimited, Bincode::default()) } // 组合 LengthDelimited + JSON async fn json_transport(io: T) -> SerdeFramed< Framed, MyMessage, MyMessage, Json, > where T: tokio::io::AsyncRead + tokio::io::AsyncWrite, { let length_delimited = LengthDelimitedCodec::builder() .length_field_length(4) .new_framed(io); SerdeFramed::new(length_delimited, Json::default()) } ``` --- ## 第二部分:tokio-stream tokio-stream 提供了异步流(Stream)的扩展功能,是处理异步数据序列的核心工具。 ### 1. StreamExt Trait 和常用方法 #### 1.1 StreamExt 核心方法 ```rust use tokio_stream::{StreamExt, self as stream}; #[tokio::main] async fn main() { // 创建一个简单的 Stream let mut stream = stream::iter(vec![1, 2, 3, 4, 5]); // next() - 获取下一个元素 while let Some(value) = stream.next().await { println!("Value: {}", value); } } ``` **StreamExt 主要方法一览**: ```rust use tokio_stream::{StreamExt, self as stream}; use std::time::Duration; async fn stream_ext_examples() { // ========== 创建 Stream ========== // 从迭代器创建 let s1 = stream::iter(vec![1, 2, 3]); // 空 Stream let s2: stream::Empty = stream::empty(); // 单元素 Stream let s3 = stream::once(async { 42 }); // pending Stream(永不完成) let s4: stream::Pending = stream::pending(); // ========== 转换方法 ========== // map - 转换每个元素 let stream = stream::iter(1..=3); let mut mapped = stream.map(|x| x * 2); assert_eq!(mapped.next().await, Some(2)); assert_eq!(mapped.next().await, Some(4)); assert_eq!(mapped.next().await, Some(6)); // filter - 过滤元素 let stream = stream::iter(1..=10); let mut evens = stream.filter(|x| x % 2 == 0); assert_eq!(evens.next().await, Some(2)); assert_eq!(evens.next().await, Some(4)); // filter_map - 同时过滤和转换 let stream = stream::iter(vec!["1", "two", "3", "four"]); let mut numbers = stream.filter_map(|s| s.parse::().ok()); assert_eq!(numbers.next().await, Some(1)); assert_eq!(numbers.next().await, Some(3)); assert_eq!(numbers.next().await, None); // flat_map - 将每个元素展开为 Stream let stream = stream::iter(vec![vec![1, 2], vec![3, 4]]); let mut flat = stream.flat_map(|v| stream::iter(v)); assert_eq!(flat.next().await, Some(1)); assert_eq!(flat.next().await, Some(2)); assert_eq!(flat.next().await, Some(3)); // ========== 限制方法 ========== // take - 只取前 N 个 let stream = stream::iter(1..=100); let mut first_five = stream.take(5); let collected: Vec<_> = first_five.collect().await; assert_eq!(collected, vec![1, 2, 3, 4, 5]); // take_while - 取元素直到条件不满足 let stream = stream::iter(vec![1, 2, 3, 10, 4, 5]); let mut less_than_10 = stream.take_while(|x| *x < 10); let collected: Vec<_> = less_than_10.collect().await; assert_eq!(collected, vec![1, 2, 3]); // skip - 跳过前 N 个 let stream = stream::iter(1..=10); let mut after_skip = stream.skip(5); assert_eq!(after_skip.next().await, Some(6)); // skip_while - 跳过直到条件不满足 let stream = stream::iter(vec![1, 2, 10, 3, 4]); let mut after_skip = stream.skip_while(|x| *x < 10); assert_eq!(after_skip.next().await, Some(10)); // ========== 聚合方法 ========== // collect - 收集到集合 let stream = stream::iter(1..=5); let vec: Vec<_> = stream.collect().await; assert_eq!(vec, vec![1, 2, 3, 4, 5]); // fold - 折叠 let stream = stream::iter(1..=5); let sum = stream.fold(0, |acc, x| async move { acc + x }).await; assert_eq!(sum, 15); // all - 检查所有元素是否满足条件 let stream = stream::iter(vec![2, 4, 6]); let all_even = stream.all(|x| x % 2 == 0).await; assert!(all_even); // any - 检查是否有元素满足条件 let stream = stream::iter(vec![1, 3, 4, 5]); let has_even = stream.any(|x| x % 2 == 0).await; assert!(has_even); // ========== 时间相关 ========== // timeout - 为每个元素设置超时 let stream = stream::iter(vec![1, 2, 3]) .timeout(Duration::from_secs(1)); // 每个 next() 调用有1秒超时 // throttle - 限流(元素之间至少间隔指定时间) let stream = stream::iter(vec![1, 2, 3]) .throttle(Duration::from_millis(100)); // 元素之间至少间隔 100ms // ========== 组合方法 ========== // chain - 连接两个 Stream let s1 = stream::iter(vec![1, 2]); let s2 = stream::iter(vec![3, 4]); let mut chained = s1.chain(s2); assert_eq!(chained.collect::>().await, vec![1, 2, 3, 4]); // merge - 合并两个 Stream(交错产出) let s1 = stream::iter(vec![1, 3, 5]); let s2 = stream::iter(vec![2, 4, 6]); let merged = s1.merge(s2); // 输出顺序取决于哪个 Stream 先准备好 // zip - 配对两个 Stream let s1 = stream::iter(vec![1, 2, 3]); let s2 = stream::iter(vec!["a", "b", "c"]); let mut zipped = s1.zip(s2); assert_eq!(zipped.next().await, Some((1, "a"))); assert_eq!(zipped.next().await, Some((2, "b"))); // ========== 错误处理 ========== // try_next - 用于 Result Stream let stream = stream::iter(vec![Ok(1), Err("error"), Ok(3)]); // stream.try_next().await 返回 Result, E> } ``` #### 1.2 实际应用示例 ```rust use tokio_stream::{StreamExt, self as stream}; use tokio::net::TcpListener; use tokio_util::codec::{Framed, LinesCodec}; use std::time::Duration; // 处理带超时的客户端连接 async fn handle_connections() -> Result<(), Box> { let listener = TcpListener::bind("127.0.0.1:8080").await?; // 将 accept 转换为 Stream let mut incoming = tokio_stream::wrappers::TcpListenerStream::new(listener) .timeout(Duration::from_secs(30)); // 30秒无连接则超时 while let Some(result) = incoming.next().await { match result { Ok(Ok(socket)) => { println!("New connection: {:?}", socket.peer_addr()); tokio::spawn(async move { let mut framed = Framed::new(socket, LinesCodec::new()); // 带超时地读取每一行 let mut lines = framed.timeout(Duration::from_secs(5)); while let Some(result) = lines.next().await { match result { Ok(Ok(line)) => println!("Received: {}", line), Ok(Err(e)) => { eprintln!("Codec error: {}", e); break; } Err(_) => { eprintln!("Read timeout"); break; } } } }); } Ok(Err(e)) => eprintln!("Accept error: {}", e), Err(_) => println!("No connections for 30 seconds"), } } Ok(()) } // 批量处理数据 async fn batch_processing() { let data = stream::iter(1..=100); // 使用 chunks 批量处理 let mut chunks = data.chunks(10); while let Some(chunk) = chunks.next().await { println!("Processing batch: {:?}", chunk); // 批量处理 10 个元素 } } // 数据管道处理 async fn data_pipeline() { let raw_data = stream::iter(vec![ " hello ", "WORLD", " Rust ", "", "Tokio", ]); let processed: Vec = raw_data .map(|s| s.trim().to_lowercase()) // 去空格、转小写 .filter(|s| !s.is_empty()) // 过滤空字符串 .enumerate() // 添加索引 .map(|(i, s)| format!("{}: {}", i, s)) // 格式化 .collect() .await; println!("{:?}", processed); // 输出: ["0: hello", "1: world", "2: rust", "3: tokio"] } ``` --- ### 2. 将异步迭代器转换为 Stream #### 2.1 使用 async_stream 宏 ```rust use async_stream::stream; use tokio_stream::StreamExt; use std::time::Duration; // 使用 async_stream 创建 Stream fn countdown(from: u32) -> impl tokio_stream::Stream { stream! { for i in (1..=from).rev() { tokio::time::sleep(Duration::from_millis(100)).await; yield i; } } } // try_stream 用于产生 Result fn fetch_pages(urls: Vec) -> impl tokio_stream::Stream> { async_stream::try_stream! { for url in urls { let response = reqwest::get(&url).await?; let body = response.text().await?; yield body; } } } #[tokio::main] async fn main() { let mut stream = countdown(5); while let Some(n) = stream.next().await { println!("{}...", n); } println!("Liftoff!"); } ``` #### 2.2 使用 unfold 创建 Stream ```rust use tokio_stream::{StreamExt, self as stream}; // unfold - 从初始状态逐步生成元素 async fn unfold_example() { // 斐波那契数列 let fibonacci = stream::unfold((0u64, 1u64), |state| async move { let (a, b) = state; // 返回 Some((产出值, 新状态)) 继续,None 结束 if a > 100 { None } else { Some((a, (b, a + b))) } }); let nums: Vec<_> = fibonacci.collect().await; println!("Fibonacci: {:?}", nums); // 输出: [0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89] } // 分页 API 获取 async fn paginated_fetch() { #[derive(Debug)] struct Page { items: Vec, next_cursor: Option, } async fn fetch_page(cursor: Option<&str>) -> Page { // 模拟 API 调用 Page { items: vec!["item1".into(), "item2".into()], next_cursor: cursor.map(|_| "next".to_string()), } } let pages = stream::unfold(Some(String::new()), |cursor| async move { let cursor_ref = cursor.as_ref()?; let page = fetch_page(Some(cursor_ref)).await; let next_state = page.next_cursor.clone(); Some((page, next_state)) }); let mut pages = pages.take(3); // 只取3页 while let Some(page) = pages.next().await { println!("Page: {:?}", page.items); } } ``` #### 2.3 手动实现 Stream ```rust use std::pin::Pin; use std::task::{Context, Poll}; use tokio_stream::Stream; use std::time::Duration; use tokio::time::{Interval, interval}; /// 定时器 Stream pub struct TimerStream { interval: Interval, count: u64, max_count: Option, } impl TimerStream { pub fn new(period: Duration) -> Self { Self { interval: interval(period), count: 0, max_count: None, } } pub fn with_max(period: Duration, max: u64) -> Self { Self { interval: interval(period), count: 0, max_count: Some(max), } } } impl Stream for TimerStream { type Item = u64; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { // 检查是否达到最大计数 if let Some(max) = self.max_count { if self.count >= max { return Poll::Ready(None); } } // 等待下一个定时器 tick match Pin::new(&mut self.interval).poll_tick(cx) { Poll::Ready(_) => { self.count += 1; Poll::Ready(Some(self.count)) } Poll::Pending => Poll::Pending, } } } // 使用示例 async fn use_timer_stream() { use tokio_stream::StreamExt; let mut timer = TimerStream::with_max(Duration::from_millis(100), 5); while let Some(count) = timer.next().await { println!("Tick #{}", count); } println!("Timer completed"); } ``` --- ### 3. Stream 组合器 #### 3.1 map - 元素转换 ```rust use tokio_stream::{StreamExt, self as stream}; async fn map_examples() { // 基本 map let stream = stream::iter(1..=5); let doubled: Vec<_> = stream.map(|x| x * 2).collect().await; assert_eq!(doubled, vec![2, 4, 6, 8, 10]); // 异步 map (使用 then) let stream = stream::iter(vec!["hello", "world"]); let uppercased: Vec<_> = stream .then(|s| async move { // 可以在这里做异步操作 tokio::time::sleep(std::time::Duration::from_millis(10)).await; s.to_uppercase() }) .collect() .await; assert_eq!(uppercased, vec!["HELLO", "WORLD"]); // map 配合 Result let stream = stream::iter(vec!["1", "2", "abc", "3"]); let results: Vec<_> = stream .map(|s| s.parse::()) .collect() .await; // results: [Ok(1), Ok(2), Err(...), Ok(3)] } ``` #### 3.2 filter - 元素过滤 ```rust use tokio_stream::{StreamExt, self as stream}; async fn filter_examples() { // 基本 filter let stream = stream::iter(1..=10); let evens: Vec<_> = stream.filter(|x| x % 2 == 0).collect().await; assert_eq!(evens, vec![2, 4, 6, 8, 10]); // 异步 filter let stream = stream::iter(vec![1, 2, 3, 4, 5]); let filtered: Vec<_> = stream .filter(|x| async move { // 模拟异步检查 tokio::time::sleep(std::time::Duration::from_millis(1)).await; x % 2 == 0 }) .collect() .await; assert_eq!(filtered, vec![2, 4]); // filter_map - 同时过滤和转换 let stream = stream::iter(vec!["1", "two", "3", "four", "5"]); let numbers: Vec<_> = stream .filter_map(|s| async move { s.parse::().ok() }) .collect() .await; assert_eq!(numbers, vec![1, 3, 5]); } ``` #### 3.3 take 和 skip - 限制元素 ```rust use tokio_stream::{StreamExt, self as stream}; async fn take_skip_examples() { // take - 只取前 N 个 let stream = stream::iter(1..=100); let first_5: Vec<_> = stream.take(5).collect().await; assert_eq!(first_5, vec![1, 2, 3, 4, 5]); // take_while - 取直到条件不满足 let stream = stream::iter(vec![1, 2, 3, 10, 4, 5]); let less_than_10: Vec<_> = stream.take_while(|x| *x < 10).collect().await; assert_eq!(less_than_10, vec![1, 2, 3]); // skip - 跳过前 N 个 let stream = stream::iter(1..=10); let after_5: Vec<_> = stream.skip(5).collect().await; assert_eq!(after_5, vec![6, 7, 8, 9, 10]); // skip_while - 跳过直到条件不满足 let stream = stream::iter(vec![1, 2, 3, 10, 4, 5]); let from_10: Vec<_> = stream.skip_while(|x| *x < 10).collect().await; assert_eq!(from_10, vec![10, 4, 5]); // 组合使用:分页 let stream = stream::iter(1..=100); let page_3: Vec<_> = stream.skip(20).take(10).collect().await; // 获取第3页(每页10个) assert_eq!(page_3, vec![21, 22, 23, 24, 25, 26, 27, 28, 29, 30]); } ``` #### 3.4 timeout - 超时控制 ```rust use tokio_stream::{StreamExt, self as stream}; use tokio::time::{Duration, sleep}; use std::io; async fn timeout_examples() { // 为每个元素设置超时 let slow_stream = stream::unfold(0, |state| async move { sleep(Duration::from_millis(state * 100)).await; if state > 5 { None } else { Some((state, state + 1)) } }); let mut timed = slow_stream.timeout(Duration::from_millis(250)); loop { match timed.next().await { Some(Ok(value)) => println!("Got: {}", value), Some(Err(_elapsed)) => { println!("Timeout!"); break; } None => { println!("Stream ended"); break; } } } // timeout_repeating - 使用 Interval 作为超时 let stream = stream::iter(vec![1, 2, 3]); let interval = tokio::time::interval(Duration::from_millis(100)); let mut timed = stream.timeout_repeating(interval); // 每次 next 有100ms超时,超时会重置 } // 实际应用:带超时的网络读取 async fn read_with_timeout(mut stream: S) -> Result, io::Error> where S: tokio_stream::Stream> + Unpin, { use tokio_stream::StreamExt; let mut results = Vec::new(); let mut timed_stream = stream.timeout(Duration::from_secs(5)); while let Some(result) = timed_stream.next().await { match result { Ok(Ok(item)) => results.push(item), Ok(Err(e)) => return Err(e), Err(_) => { return Err(io::Error::new( io::ErrorKind::TimedOut, "Stream read timeout" )); } } } Ok(results) } ``` --- ### 4. BroadcastStream 和 WatchStream #### 4.1 BroadcastStream - 广播通道流 `BroadcastStream` 将 `tokio::sync::broadcast::Receiver` 包装为 Stream: ```rust use tokio::sync::broadcast; use tokio_stream::wrappers::BroadcastStream; use tokio_stream::StreamExt; async fn broadcast_stream_example() { // 创建广播通道 let (tx, _rx) = broadcast::channel::(16); // 创建多个订阅者 let mut subscriber1 = BroadcastStream::new(tx.subscribe()); let mut subscriber2 = BroadcastStream::new(tx.subscribe()); // 发送消息的任务 let tx_clone = tx.clone(); tokio::spawn(async move { for i in 1..=5 { tx_clone.send(format!("Message {}", i)).unwrap(); tokio::time::sleep(std::time::Duration::from_millis(100)).await; } drop(tx_clone); // 关闭发送端 }); // 订阅者1 let handle1 = tokio::spawn(async move { while let Some(result) = subscriber1.next().await { match result { Ok(msg) => println!("Subscriber 1: {}", msg), Err(e) => eprintln!("Subscriber 1 error: {:?}", e), } } println!("Subscriber 1 done"); }); // 订阅者2 let handle2 = tokio::spawn(async move { while let Some(result) = subscriber2.next().await { match result { Ok(msg) => println!("Subscriber 2: {}", msg), Err(e) => eprintln!("Subscriber 2 error: {:?}", e), } } println!("Subscriber 2 done"); }); let _ = tokio::join!(handle1, handle2); } // 处理 BroadcastStream 的 Lagged 错误 async fn handle_lagged() { use tokio::sync::broadcast::error::RecvError; let (tx, _rx) = broadcast::channel::(2); // 小容量 let mut stream = BroadcastStream::new(tx.subscribe()); // 快速发送,可能导致 lagged for i in 1..=10 { let _ = tx.send(i); } while let Some(result) = stream.next().await { match result { Ok(value) => println!("Got: {}", value), Err(tokio_stream::wrappers::errors::BroadcastStreamRecvError::Lagged(n)) => { println!("Missed {} messages", n); // 继续接收后续消息 } } } } ``` #### 4.2 WatchStream - 监视通道流 `WatchStream` 将 `tokio::sync::watch::Receiver` 包装为 Stream: ```rust use tokio::sync::watch; use tokio_stream::wrappers::WatchStream; use tokio_stream::StreamExt; use std::time::Duration; // 配置热更新示例 #[derive(Debug, Clone, PartialEq)] struct Config { timeout: Duration, max_connections: usize, } async fn config_watcher_example() { // 初始配置 let initial_config = Config { timeout: Duration::from_secs(30), max_connections: 100, }; let (tx, rx) = watch::channel(initial_config); // 创建 WatchStream let mut config_stream = WatchStream::new(rx); // 配置更新任务 let tx_clone = tx.clone(); tokio::spawn(async move { tokio::time::sleep(Duration::from_secs(1)).await; tx_clone.send(Config { timeout: Duration::from_secs(60), max_connections: 200, }).unwrap(); tokio::time::sleep(Duration::from_secs(1)).await; tx_clone.send(Config { timeout: Duration::from_secs(45), max_connections: 150, }).unwrap(); }); // 监听配置变化 while let Some(config) = config_stream.next().await { println!("Config updated: {:?}", config); // 应用新配置... } } // WatchStream 与 select! 配合使用 async fn watch_with_select() { let (shutdown_tx, shutdown_rx) = watch::channel(false); let mut shutdown_stream = WatchStream::new(shutdown_rx); // 模拟工作循环 let work_handle = tokio::spawn(async move { let mut interval = tokio::time::interval(Duration::from_millis(500)); loop { tokio::select! { _ = interval.tick() => { println!("Doing work..."); } Some(should_shutdown) = shutdown_stream.next() => { if should_shutdown { println!("Shutdown signal received"); break; } } } } println!("Worker stopped"); }); // 2秒后发送关闭信号 tokio::time::sleep(Duration::from_secs(2)).await; shutdown_tx.send(true).unwrap(); work_handle.await.unwrap(); } ``` #### 4.3 其他 Wrappers ```rust use tokio_stream::wrappers::{ TcpListenerStream, UnixListenerStream, IntervalStream, ReceiverStream, UnboundedReceiverStream, SignalStream, }; // TcpListenerStream - TCP 监听器流 async fn tcp_listener_stream_example() { use tokio::net::TcpListener; use tokio_stream::StreamExt; let listener = TcpListener::bind("127.0.0.1:8080").await.unwrap(); let mut incoming = TcpListenerStream::new(listener); while let Some(result) = incoming.next().await { match result { Ok(socket) => { println!("New connection: {:?}", socket.peer_addr()); } Err(e) => { eprintln!("Accept error: {}", e); } } } } // IntervalStream - 定时器流 async fn interval_stream_example() { use tokio::time::{interval, Duration}; use tokio_stream::StreamExt; let mut ticks = IntervalStream::new(interval(Duration::from_secs(1))) .take(5); while let Some(_) = ticks.next().await { println!("Tick at {:?}", std::time::Instant::now()); } } // ReceiverStream - mpsc 接收器流 async fn receiver_stream_example() { use tokio::sync::mpsc; use tokio_stream::StreamExt; let (tx, rx) = mpsc::channel::(10); let mut stream = ReceiverStream::new(rx); tokio::spawn(async move { for i in 1..=5 { tx.send(i).await.unwrap(); } }); while let Some(value) = stream.next().await { println!("Received: {}", value); } } ``` --- ### 5. Stream 与 select! 配合使用 #### 5.1 基本用法 ```rust use tokio_stream::{StreamExt, self as stream}; use tokio::time::{Duration, sleep}; async fn select_with_stream() { let mut data_stream = stream::iter(vec![1, 2, 3]); let sleep_future = sleep(Duration::from_secs(1)); tokio::pin!(sleep_future); loop { tokio::select! { // 从 Stream 获取下一个元素 maybe_value = data_stream.next() => { match maybe_value { Some(v) => println!("Got value: {}", v), None => { println!("Stream ended"); break; } } } // 超时 _ = &mut sleep_future => { println!("Timeout!"); break; } } } } ``` #### 5.2 多 Stream 合并 ```rust use tokio_stream::{StreamExt, self as stream}; use tokio::time::{Duration, interval}; use tokio_stream::wrappers::IntervalStream; async fn merge_multiple_streams() { // 创建多个不同来源的 Stream let numbers = stream::iter(1..=5); let letters = stream::iter(vec!['a', 'b', 'c', 'd', 'e']); // 使用 zip 配对 let mut zipped = numbers.zip(letters); while let Some((num, letter)) = zipped.next().await { println!("{}{}", num, letter); } } // 使用 select! 处理多个异步流 async fn select_multiple_streams() { let mut stream1 = stream::iter(vec![1, 2, 3]).fuse(); let mut stream2 = stream::iter(vec![10, 20, 30]).fuse(); let mut count1 = 0; let mut count2 = 0; loop { tokio::select! { Some(v) = stream1.next() => { println!("Stream 1: {}", v); count1 += 1; } Some(v) = stream2.next() => { println!("Stream 2: {}", v); count2 += 1; } else => { println!("Both streams completed"); break; } } } println!("Stream 1 produced {} items, Stream 2 produced {} items", count1, count2); } ``` #### 5.3 复杂事件循环 ```rust use tokio::sync::{mpsc, watch, broadcast}; use tokio_stream::{StreamExt, wrappers::{ReceiverStream, WatchStream, BroadcastStream}}; use tokio::time::{Duration, interval}; use tokio_stream::wrappers::IntervalStream; /// 命令类型 #[derive(Debug, Clone)] enum Command { Process(String), Pause, Resume, } /// 事件处理器 struct EventProcessor { commands: ReceiverStream, config: WatchStream, events: BroadcastStream, ticker: IntervalStream, paused: bool, } impl EventProcessor { async fn run(&mut self) { loop { tokio::select! { // 处理命令 Some(cmd) = self.commands.next() => { match cmd { Command::Process(data) => { if !self.paused { println!("Processing: {}", data); } else { println!("Paused, ignoring: {}", data); } } Command::Pause => { self.paused = true; println!("Paused"); } Command::Resume => { self.paused = false; println!("Resumed"); } } } // 配置更新 Some(new_config) = self.config.next() => { println!("Config updated: {}", new_config); } // 广播事件 Some(Ok(event)) = self.events.next() => { println!("Broadcast event: {}", event); } // 定时任务 Some(_) = self.ticker.next() => { if !self.paused { println!("Tick - doing periodic work"); } } // 所有 Stream 都完成 else => { println!("All streams closed, shutting down"); break; } } } } } async fn run_event_processor() { let (cmd_tx, cmd_rx) = mpsc::channel::(10); let (config_tx, config_rx) = watch::channel("initial".to_string()); let (event_tx, _) = broadcast::channel::(16); let mut processor = EventProcessor { commands: ReceiverStream::new(cmd_rx), config: WatchStream::new(config_rx), events: BroadcastStream::new(event_tx.subscribe()), ticker: IntervalStream::new(interval(Duration::from_secs(1))), paused: false, }; // 模拟外部事件 tokio::spawn(async move { tokio::time::sleep(Duration::from_millis(500)).await; cmd_tx.send(Command::Process("data1".into())).await.ok(); tokio::time::sleep(Duration::from_millis(500)).await; config_tx.send("new_config".to_string()).ok(); tokio::time::sleep(Duration::from_millis(500)).await; cmd_tx.send(Command::Pause).await.ok(); tokio::time::sleep(Duration::from_secs(1)).await; cmd_tx.send(Command::Resume).await.ok(); tokio::time::sleep(Duration::from_millis(500)).await; event_tx.send("important_event".into()).ok(); }); // 运行处理器3秒 tokio::select! { _ = processor.run() => {} _ = tokio::time::sleep(Duration::from_secs(3)) => { println!("Timeout, stopping processor"); } } } ``` #### 5.4 优雅关闭模式 ```rust use tokio::sync::watch; use tokio_stream::{StreamExt, wrappers::WatchStream}; use tokio::time::{Duration, interval}; async fn graceful_shutdown_example() { // 关闭信号 let (shutdown_tx, shutdown_rx) = watch::channel(false); let mut shutdown = WatchStream::new(shutdown_rx); // 工作任务 let work_handle = tokio::spawn(async move { let mut work_interval = interval(Duration::from_millis(200)); loop { tokio::select! { biased; // 优先检查关闭信号 Some(true) = shutdown.next() => { println!("Shutdown signal received, cleaning up..."); // 执行清理工作 tokio::time::sleep(Duration::from_millis(100)).await; println!("Cleanup complete"); break; } _ = work_interval.tick() => { println!("Doing work at {:?}", std::time::Instant::now()); } } } }); // 1秒后发送关闭信号 tokio::time::sleep(Duration::from_secs(1)).await; println!("Sending shutdown signal"); shutdown_tx.send(true).unwrap(); // 等待工作任务完成 work_handle.await.unwrap(); println!("All tasks stopped"); } // 使用 CancellationToken 的更现代方式 use tokio_util::sync::CancellationToken; async fn cancellation_token_example() { let token = CancellationToken::new(); let token_clone = token.clone(); let work_handle = tokio::spawn(async move { let mut interval = interval(Duration::from_millis(200)); loop { tokio::select! { _ = token_clone.cancelled() => { println!("Cancelled!"); break; } _ = interval.tick() => { println!("Working..."); } } } }); tokio::time::sleep(Duration::from_secs(1)).await; token.cancel(); work_handle.await.unwrap(); } ``` --- ## 附录:常用模式速查 ### Codec 选择指南 | 场景 | 推荐 Codec | |------|-----------| | 原始字节流 | BytesCodec | | 行文本协议 (SMTP, POP3) | LinesCodec | | 二进制长度前缀协议 | LengthDelimitedCodec | | JSON-RPC | LinesCodec + serde_json | | 自定义二进制协议 | 自定义 Decoder/Encoder | | gRPC/Protobuf | LengthDelimitedCodec + prost | ### Stream 方法速查 | 方法 | 用途 | 示例 | |------|------|------| | next() | 获取下一个元素 | `stream.next().await` | | map() | 转换元素 | `stream.map(\|x\| x * 2)` | | filter() | 过滤元素 | `stream.filter(\|x\| x % 2 == 0)` | | take(n) | 只取前 n 个 | `stream.take(10)` | | skip(n) | 跳过前 n 个 | `stream.skip(5)` | | timeout() | 设置超时 | `stream.timeout(Duration::from_secs(5))` | | collect() | 收集到集合 | `stream.collect::>().await` | | fold() | 折叠/归约 | `stream.fold(0, \|a, b\| async move { a + b })` | | chain() | 连接两个流 | `stream1.chain(stream2)` | | zip() | 配对两个流 | `stream1.zip(stream2)` | | merge() | 合并两个流 | `stream1.merge(stream2)` | | fuse() | 确保完成后不再 poll | `stream.fuse()` | ### Wrappers 速查 | Wrapper | 来源类型 | 用途 | |---------|---------|------| | BroadcastStream | broadcast::Receiver | 多消费者广播 | | WatchStream | watch::Receiver | 配置/状态监听 | | ReceiverStream | mpsc::Receiver | 消息通道 | | UnboundedReceiverStream | mpsc::UnboundedReceiver | 无界消息通道 | | TcpListenerStream | TcpListener | TCP 连接接受 | | IntervalStream | Interval | 定时器 | --- 本文档基于 tokio 1.x、tokio-util 0.7.x 和 tokio-stream 0.1.x 版本编写。