管道 - 过滤器(Pipe-and-Filter)架构风格
一、核心定义与本质
管道过滤器是数据流驱动的经典架构模式,属于结构化 / 数据流架构,最早源于 Unix Shell 设计思想。
核心组成两部分
- 过滤器(Filter)
- 独立、无状态、单一职责的数据处理单元;
- 只做一件事:接收输入数据流 → 转换 / 加工 / 过滤数据 → 输出数据流;
- 不共享全局数据,不依赖其他过滤器内部状态,可独立复用、替换、并行运行。
- 管道(Pipe)
- 连接两个过滤器的通道,只负责传输数据流,不修改数据;
- 标准单向传输:上游 Filter 输出 → Pipe → 下游 Filter 输入;
- 解耦:过滤器之间互不感知对方实现,只约定数据格式。
核心思想
数据流经一系列独立处理组件,每个组件只完成单一处理,组件之间仅通过数据流通信。
二、关键特性
优点
- 高复用:单个过滤器可在多个流程中重复使用(如日志过滤、格式转换);
- 易修改扩展:新增处理逻辑只需插入新过滤器,无需改动原有代码;
- 可并行执行:多过滤器可流水线并发处理,提升吞吐量;
- 松耦合:过滤器完全隔离,替换其中一个不影响整条链路;
- 可调试:管道中间可截流、打印数据流,快速定位异常;
- 无状态天然容错:单个过滤器故障不直接击穿全链路(可加熔断)。
缺点
- 数据格式强约束:上下游必须统一数据流格式,格式变更改动全链路;
- 额外 IO 开销:数据反复序列化 / 传输,大数据场景性能损耗明显;
- 不适合交互场景:同步、低延迟人机交互不适合(适合批处理、流式处理);
- 复杂分支难维护:多分支、循环管道会大幅提升架构复杂度。
三、管道过滤器常见拓扑结构
- 线性串联(最基础)Filter1 → Pipe → Filter2 → Pipe → Filter3
- 分叉(广播)一个过滤器输出,通过管道分发给多个下游过滤器并行处理
- 汇聚(合并)多个过滤器输出管道汇入同一个下游过滤器(如多日志合并)
- 循环管道处理后数据回流上游再次过滤(较少用,易死循环)
四、最经典真实底层示例:Unix/Linux Shell 命令(原生管道过滤器)
原理
Unix 每个命令就是Filter,|符号就是Pipe; 每个命令默认:标准输入 stdin、标准输出 stdout,管道把前一个 stdout 接到后一个 stdin。
示例 1:日志过滤完整链路
# 原始日志文件读取(Filter1) → 筛选错误日志(Filter2) → 提取时间字段(Filter3) → 排序(Filter4) → 输出文件(Filter5) cat app.log | grep "ERROR" | awk '{print $1,$2}' | sort > error_time.log拆解对应架构:
- Filter1:
cat— 读取文件,输出原始日志流 - Pipe1:
|— 传输全部日志文本 - Filter2:
grep "ERROR"— 过滤器:只保留包含 ERROR 的行 - Pipe2:
|— 传输错误日志行 - Filter3:
awk— 过滤器:截取每行前两列(日期、时间) - Pipe3:
|— 传输时间文本流 - Filter4:
sort— 过滤器:按时间排序 - Pipe4:
>— 管道写入目标文件
特点:
- 每个命令独立、单一职责,可单独拿出来使用;
- 管道只传文本,不处理业务;
- 随意增删过滤器:加
head -10只看前 10 条,不改动原有逻辑。
五、工程级真实业务系统示例(后端开发常用)
示例 1:实时日志处理系统(ELK 架构 = 标准管道过滤器)
整体链路:原始日志 → Filebeat → Logstash → Elasticsearch → Kibana 逐个映射:
- Filter1 Filebeat:采集服务器日志文件、切割日志流
- Pipe1:网络 TCP 通道传输日志原始文本
- Filter2 Logstash(多过滤器串联)
- grok 过滤器:解析非结构化日志为 JSON 结构化数据
- date 过滤器:统一时间字段格式
- drop 过滤器:丢弃无用调试日志
- mutate 过滤器:新增业务标签字段
- Pipe2:http 管道传输结构化 JSON
- Filter3 Elasticsearch:存储、索引、分词过滤数据
- Pipe3:查询数据流
- Filter4 Kibana:聚合、图表过滤、可视化输出
特点:任意一层过滤器可替换(Filebeat 替换为 Flink 采集,Logstash 替换为 Flink Transform),完全符合管道过滤器设计。
示例 2:大数据实时计算 Flink/Spark Streaming
流式计算是管道过滤器工业级落地: 数据流链路:消息队列 (Kafka) → 数据源 Filter → 清洗 Filter → 转换 Filter → 聚合 Filter → 入库 Filter
- 每个算子 (map/filter/flatMap) = 独立过滤器;
- 上下游算子之间的数据通道 = 管道;
- 可随意插入新算子(脱敏、风控过滤),不修改原有计算逻辑。
业务场景:电商用户行为流处理
- Filter1:读取 Kafka 用户点击日志
- Filter2:清洗:过滤非法埋点、空数据
- Filter3:脱敏:手机号、身份证加密处理
- Filter4:转换:拆分行为字段、关联商品基础数据
- Filter5:聚合:统计每小时商品点击量
- Filter6:输出:写入 MySQL 报表库
六、日志处理Demo
package test; import java.util.ArrayList; import java.util.List; import java.util.Random; // ===================== 统一数据流载体(管道传输的数据) ===================== class DataRecord { private String content; public DataRecord(String content) { this.content = content; } public String getContent() { return content; } } // ===================== 过滤器顶层接口 所有处理单元实现该接口 ===================== interface Filter { List<DataRecord> process(List<DataRecord> input); } // ===================== 管道工具类:串联所有过滤器 ===================== class PipeLine { // 链式执行所有过滤器,数据在管道中流转 public static List<DataRecord> run(List<DataRecord> sourceData, Filter... filters) { List<DataRecord> data = sourceData; for (Filter filter : filters) { // 管道传输数据到下一个过滤器 data = filter.process(data); } return data; } } // ===================== 自定义实体:用户行为 ===================== class UserAction { private String userId; private String action; private Long productId; public UserAction(String userId, String action, Long productId) { this.userId = userId; this.action = action; this.productId = productId; } @Override public String toString() { return "UserAction{" + "userId='" + userId + '\'' + ", action='" + action + '\'' + ", productId=" + productId + '}'; } } // ===================== 各个独立过滤器(对应Flink算子) ===================== // 过滤器1:模拟数据源,批量生成原始日志 class SourceFilter implements Filter { @Override public List<DataRecord> process(List<DataRecord> input) { List<DataRecord> result = new ArrayList<>(); Random random = new Random(); String[] actions = {"click", "buy", "collect", "invalid"}; // 模拟生成10条埋点日志 for (int i = 0; i < 10; i++) { String uid = "u" + random.nextInt(9999); String act = actions[random.nextInt(actions.length)]; long pid = random.nextInt(1000); String log = uid + "," + act + "," + pid; result.add(new DataRecord(log)); } return result; } } // 过滤器2:清洗过滤,丢弃非法脏数据 class CleanFilter implements Filter { @Override public List<DataRecord> process(List<DataRecord> input) { List<DataRecord> result = new ArrayList<>(); for (DataRecord record : input) { String line = record.getContent(); String[] arr = line.split(","); // 过滤格式错误、非法行为 if (arr.length != 3) { continue; } String action = arr[1]; if ("click".equals(action) || "buy".equals(action) || "collect".equals(action)) { result.add(record); } } return result; } } // 过滤器3:字符串日志转结构化对象(模拟map转换算子) class ConvertFilter implements Filter { // 转换后存储对象,单独输出 public List<UserAction> convert(List<DataRecord> input) { List<UserAction> list = new ArrayList<>(); for (DataRecord record : input) { String[] arr = record.getContent().split(","); UserAction action = new UserAction(arr[0], arr[1], Long.valueOf(arr[2])); list.add(action); } return list; } @Override public List<DataRecord> process(List<DataRecord> input) { return input; } } // 过滤器4:用户ID脱敏处理 class MaskFilter { public List<UserAction> mask(List<UserAction> input) { List<UserAction> res = new ArrayList<>(); for (UserAction ua : input) { String rawId = ua.toString().split("userId='")[1].split("'")[0]; String maskId = rawId.substring(0, 2) + "****"; String action = ua.toString().split("action='")[1].split("'")[0]; String pid = ua.toString().split("productId=")[1].split("}")[0]; res.add(new UserAction(maskId, action, Long.valueOf(pid))); } return res; } } // ===================== 主程序:组装整条管道过滤器链路 ===================== public class SimplePipeFilterDemo { public static void main(String[] args) { // 1. 组装过滤器链路 Filter source = new SourceFilter(); Filter clean = new CleanFilter(); ConvertFilter convert = new ConvertFilter(); MaskFilter mask = new MaskFilter(); // 2. 管道流转数据:生成原始数据 -> 清洗 List<DataRecord> rawData = source.process(new ArrayList<>()); List<DataRecord> cleanData = PipeLine.run(rawData, clean); // 3. 管道下游:转换实体 + 脱敏 List<UserAction> entityList = convert.convert(cleanData); List<UserAction> finalResult = mask.mask(entityList); // 4. 输出过滤器:打印最终结果 System.out.println("===== 管道过滤器最终输出结果 ====="); for (UserAction item : finalResult) { System.out.println(item); } } }