在大规模日志处理场景中,HDFS 上通常存储大量以.gz压缩格式保存的 JSONL(JSON Lines)文件——即每行包含一个独立的 JSON 对象。本文介绍一种轻量、可移植且高效的解决方案:通过 Hadoop Streaming 配合 Python 脚本,将这些日志解析后批量写入 Elasticsearch(ES)。这种“多集群分布式处理 + 多节点并行写入”的架构,在生产环境中表现出良好的扩展性与稳定性。
整体流程概览
- Mapper:从 HDFS 读取
.gz压缩文件(Hadoop 自动解压),逐行解析 JSON,提取所需字段,输出标准化的 JSON 行。 - Reducer:接收 Mapper 输出,按批次调用 Elasticsearch 的
bulkAPI 写入目标索引。 - 构建分发:使用 PyInstaller 将 Python 脚本及依赖(如
elasticsearch)打包成两个独立可执行文件。 - 作业提交:通过 Hadoop Streaming 提交 MapReduce 任务,并利用
-archive机制将打包文件分发至各计算节点。
该方案对集群环境无侵入性,部署简单,特别适用于异构、受限或无法安装第三方库的生产环境。
一、Mapper 脚本:解析与字段提取
由于本场景不涉及分组或聚合操作,Mapper 可直接输出纯 JSON 字符串,无需遵循 key-value 格式,从而简化后续处理逻辑。
mapper.py
#!/usr/bin/env python3importsysimportjsonforlineinsys.stdin:line=line.strip()ifnotline:continuetry:data=json.loads(line)out={"id":data.get("id"),"name":data.get("name"),"timestamp":data.get("timestamp")}print(json.dumps(out,ensure_ascii=False)