MapReduce是Hadoop生态系统的分布式计算框架,采用分而治之的设计思想,将大规模数据集拆分成多个小数据块,在集群节点上并行处理,最后汇总结果。本次实验从三个维度深入掌握MapReduce:Shell命令操作、YARN Web界面监控和Java API编程实现,并以经典的WordCount词频统计为例,完整演示分布式程序的开发流程。
一、MapReduce核心原理与架构
1.1 执行流程概览
MapReduce执行阶段详解:
| 阶段 | 功能 | 说明 |
|---|---|---|
| InputFormat | 数据输入 | 读取HDFS数据,按块(Block)切分成InputSplit |
| Mapper | 映射处理 | 将输入键值对转换为中间键值对(如<word, 1>) |
| Shuffle | 洗牌排序 | 对Mapper输出进行分区、排序、合并,传输给Reducer |
| Reducer | 归约聚合 | 对相同Key的Value进行汇总计算(如求和) |
| OutputFormat | 结果输出 | 将最终结果写入HDFS |
1.2 WordCount词频统计流程图
以输入文本hello world hello hadoop为例:
输入数据 → Mapper处理 → Shuffle排序 → Reducer聚合 → 输出结果 hello world → <hello,1> → <hello,<1,1>> → <hello,2> hello hadoop <world,1> <world,1> <world,1> <hello,1> <hadoop,1> <hadoop,1> <hadoop,1>🖥️ 实验一:常用Shell命令
1.1 计算圆周率(官方示例)
Hadoop自带了hadoop-mapreduce-examplesjar包,可直接运行测试:
yarnjar$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.4.jar pi1010参数说明:
pi:运行计算圆周率程序10:Map任务数量10:每个Map任务的采样点数
💡实验现象:任务提交后,可在终端看到进度输出,最终返回π的估算值。
1.2 YARN应用管理命令
①-list:列出所有Application
yarnapplication-list输出示例:
Application-Id Application-Name Application-Type User Queue State Final-State Progress application_1762656973821_0002 word count MAPREDUCE lyq default RUNNING UNDEFINED 50%②-status:查看任务状态
yarnapplication-statusapplication_1762656973821_0002返回信息:
- 应用ID、名称、类型
- 运行状态(NEW/SUBMITTED/ACCEPTED/RUNNING/FINISHED/FAILED/KILLED)
- 启动时间、运行时长
- 资源分配(内存、CPU)
③-kill:停止任务
yarnapplication-killapplication_1762656973821_0002⚠️使用场景:任务卡死、资源占用过高或提交错误时,及时终止释放资源。
1.3 Shell命令速查表
| 命令 | 功能 | 示例 |
|---|---|---|
yarn jar <jar> <mainClass> | 提交MapReduce作业 | yarn jar examples.jar pi 10 10 |
yarn application -list | 查看所有应用 | 显示Running/Finished状态 |
yarn application -status <id> | 查看应用详情 | 包含进度、资源、日志链接 |
yarn application -kill <id> | 终止应用 | 强制停止任务释放资源 |
yarn node -list | 查看集群节点 | 显示Active/Decommissioned节点 |
🌐 实验二:YARN Web管理界面
2.1 访问方式
在浏览器地址栏输入:
http://hadoop102:8088📌 YARN ResourceManager默认端口为8088
2.2 Web界面功能展示
YARN Web UI核心功能:
| 菜单 | 功能说明 |
|---|---|
| Cluster | 查看集群整体状态、版本信息、HA状态 |
| Cluster Metrics | 显示提交/运行/完成的应用数、容器数、内存使用 |
| Scheduler | 查看资源调度策略(Capacity/FIFO Scheduler) |
| Applications | 查看所有应用列表,包含ID、名称、用户、状态、进度 |
| Nodes | 查看所有NodeManager节点状态、资源容量、容器数 |
| Tools | 提供本地日志查看、配置检查等工具 |
2.3 实际应用场景
在运行WordCount任务时,通过Web界面可以:
- 实时监控进度:观察Map和Reduce任务的完成百分比
- 快速定位问题:当Reduce任务失败时,通过日志链接查看错误原因
- 资源分析:查看任务占用的内存和CPU,优化资源分配
💡踩坑记录:一次Reduce任务失败,通过Web界面查看日志发现是输出目录已存在导致,删除目录后任务顺利执行。这种"可视化监控+日志排查"的组合,大幅提升了问题解决效率!
☕ 实验三:Java编程实现WordCount
3.1 项目结构
MyFirstMapReduceDemo ├── pom.xml # Maven配置 └── src/main/java/com/mapreduce/ ├── MyMapper.java # Mapper类 ├── MyReduce.java # Reducer类 └── Driver.java # 驱动类(程序入口)3.2 Maven依赖配置
<dependencies><!-- Hadoop Common --><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>3.1.3</version></dependency><!-- MapReduce Client --><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-mapreduce-client-core</artifactId><version>3.1.3</version></dependency><!-- MapReduce JobClient --><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-mapreduce-client-jobclient</artifactId><version>3.1.3</version></dependency></dependencies>3.3 核心代码实现
① Mapper类:数据拆分与映射
packagecom.mapreduce;importorg.apache.hadoop.io.IntWritable;importorg.apache.hadoop.io.LongWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Mapper;importjava.io.IOException;/** * Mapper类 * 输入键值对:<行号, 文本行> * 输出键值对:<单词, 1> */publicclassMyMapperextendsMapper<LongWritable,Text,Text,IntWritable>{// 常量1,避免在map方法中重复创建对象privatefinalIntWritablenum=newIntWritable(1);privatefinalTexttext=newText();@Overrideprotectedvoidmap(LongWritablekey,Textvalue,Contextcontext)throwsIOException,InterruptedException{// 1. 将Text转换为StringStringline=value.toString();// 2. 按空格拆分单词String[]words=line.split(" ");// 3. 输出<单词, 1>键值对for(Stringword:words){text.set(word);context.write(text,num);// eg: hello -> (hello, 1)}}}关键设计说明:
| 类型 | 作用 | 对应Java类型 |
|---|---|---|
LongWritable | 输入Key(行偏移量) | long |
Text | 输入Value(文本行)/ 输出Key(单词) | String |
IntWritable | 输出Value(计数1) | int |
💡性能优化:将
IntWritable和Text对象定义为成员变量,避免在循环中重复创建,减少GC压力。
② Reducer类:结果聚合与归约
packagecom.mapreduce;importorg.apache.hadoop.io.IntWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Reducer;importjava.io.IOException;/** * Reducer类 * 输入键值对:<单词, <1,1,1,...>> * 输出键值对:<单词, 总次数> */publicclassMyReduceextendsReducer<Text,IntWritable,Text,IntWritable>{privateintsum;privatefinalIntWritableresult=newIntWritable();@Overrideprotectedvoidreduce(Textkey,Iterable<IntWritable>values,Contextcontext)throwsIOException,InterruptedException{// 1. 初始化累加器sum=0;// 2. 遍历所有相同单词的计数,累加求和for(IntWritablevalue:values){sum+=value.get();}// 3. 输出<单词, 总次数>result.set(sum);context.write(key,result);// eg: (hello, <1,1,1>) -> (hello, 3)}}Reduce执行逻辑图解:
输入: <hello, [1, 1, 1]> 输入: <world, [1]> ↓ ↓ sum = 0 sum = 0 sum += 1 → 1 sum += 1 → 1 sum += 1 → 2 sum += 1 → 3 ↓ ↓ 输出: <hello, 3> 输出: <world, 1>③ Driver驱动类:任务配置与提交
packagecom.mapreduce;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.fs.Path;importorg.apache.hadoop.io.IntWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Job;importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;/** * Driver驱动类:程序入口 * 负责配置Job参数、关联Mapper和Reducer、设置输入输出路径 */publicclassDriver{publicstaticvoidmain(String[]args)throwsException{// 1. 获取配置信息及Job对象Configurationconf=newConfiguration();Jobjob=Job.getInstance(conf);// 2. 关联本Driver程序的jar(main方法所在类)job.setJarByClass(Driver.class);// 3. 关联Mapper和Reducer类job.setMapperClass(MyMapper.class);job.setReducerClass(MyReduce.class);// 4. 设置Mapper输出的Key-Value类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);// 5. 设置最终输出的Key-Value类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);// 6. 设置输入和输出路径// 方式一:Windows本地运行(测试环境)// FileInputFormat.setInputPaths(job, new Path("D:\\input"));// FileOutputFormat.setOutputPath(job, new Path("D:\\output"));// 方式二:Linux集群运行(生产环境,通过args传参)FileInputFormat.setInputPaths(job,newPath(args[0]));FileOutputFormat.setOutputPath(job,newPath(args[1]));// 7. 提交Job并等待完成booleanresult=job.waitForCompletion(true);System.exit(result?0:1);}}Driver配置参数详解:
| 方法 | 作用 | 说明 |
|---|---|---|
setJarByClass() | 设置Jar包位置 | 通过反射找到包含main方法的类 |
setMapperClass() | 关联Mapper类 | 指定数据映射逻辑 |
setReducerClass() | 关联Reducer类 | 指定数据归约逻辑 |
setMapOutputKeyClass() | Map输出Key类型 | 必须与Mapper输出一致 |
setMapOutputValueClass() | Map输出Value类型 | 必须与Mapper输出一致 |
setOutputKeyClass() | 最终输出Key类型 | 通常与Map输出一致 |
setOutputValueClass() | 最终输出Value类型 | 通常与Map输出一致 |
FileInputFormat.setInputPaths() | 输入路径 | 支持文件或目录 |
FileOutputFormat.setOutputPath() | 输出路径 | 必须不存在,否则报错 |
waitForCompletion() | 提交并等待 | true表示打印进度信息 |
3.4 程序打包与集群运行
步骤1:Maven打包
先进行clean:
然后进行package
生成MyFirstMapReduceDemo-1.0-SNAPSHOT.jar
步骤2:启动Hadoop集群
# 在Master节点执行start-dfs.sh start-yarn.sh# 验证jps# 应看到: NameNode, DataNode, ResourceManager, NodeManager, JobHistoryServer步骤3:上传Jar包到HDFS/本地
# 将jar包上传到Linux服务器scpMyFirstMapReduceDemo-1.0-SNAPSHOT.jar lyq@hadoop101:/home/lyq/步骤4:准备输入数据
# 创建输入文件echo"hello world hello hadoop">word.txtecho"hello mapreduce hello yarn">>word.txt# 上传到HDFShadoop fs-mkdir-p/input hadoop fs-putword.txt /input/步骤5:运行MapReduce作业
hadoop jar MyFirstMapReduceDemo-1.0-SNAPSHOT.jar com.mapreduce.Driver /input/word.txt /output命令解析:
hadoop jar:提交Jar包运行com.mapreduce.Driver:主类全限定名/input/word.txt:HDFS输入路径/output:HDFS输出路径(必须不存在)
步骤6:查看运行结果
# 查看输出目录hadoop fs-ls/output# 查看结果文件hadoop fs-cat/output/part-r-00000预期输出:
hadoop 1 hello 4 mapreduce 1 world 1 yarn 1🎉成功!分布式词频统计完成,结果已写入HDFS。
3.5 运行监控截图
在任务运行期间,访问YARN Web UI查看实时进度,点击Application ID可进入详情页,查看Map/Reduce任务进度、日志信息和资源使用:
![]()
📝 实验心得与总结
🎯 核心收获
| 维度 | 收获 |
|---|---|
| Shell命令 | 掌握yarn jar提交作业、yarn application管理应用,理解资源调度机制 |
| Web界面 | 可视化监控任务生命周期,通过日志快速定位问题(如输出目录已存在) |
| Java编程 | 深入理解Mapper-Reduce编程模型,掌握Job配置与集群部署流程 |
🔧 踩坑记录与解决方案
| 问题 | 原因 | 解决方案 |
|---|---|---|
| 输出目录已存在 | MapReduce不允许覆盖已有目录 | 删除旧目录或更换新路径 |
| ClassNotFoundException | Maven依赖未正确引入 | 检查hadoop-mapreduce-client依赖 |
| 权限不足 | HDFS目录权限限制 | 使用hadoop fs -chmod修改权限 |
| 任务卡在RUNNING状态 | 资源不足或数据倾斜 | 调整mapreduce.job.reduces参数 |
| 本地运行正常,集群失败 | 路径格式或依赖问题 | 使用HDFS路径,确保Jar包包含所有依赖 |
🧠 设计思想深度理解
通过本次实验,深刻理解了MapReduce的**“分而治之”**哲学:
- Map阶段:将大数据集拆分成多个小数据块,在节点上并行处理,实现数据局部性(移动计算而非移动数据)
- Shuffle阶段:自动完成数据排序、分区、传输,隐藏分布式通信复杂性
- Reduce阶段:对中间结果聚合汇总,输出最终结果
💡关键认知:MapReduce的核心价值不在于单机性能,而在于水平扩展能力——增加节点即可线性提升处理能力,适合PB级数据分析。