Skip to content

Yarn的Tool接口

1. 问题回顾

sh
[jack@hadoop102 hadoop-3.3.6]$ hadoop jar wc.jar com.rocket.mapreduce.wordcount2.WordCountDriver -Dmapreduce.job.queuename=root.test /input /output1

报错提示/input已存在,/input是输入目录,原因是程序已经把Dmapreduce.job.queuename=root.test作为程序的第一个参数。 Alt text

2. 解决办法

可以借助Yarn的Tool接口,实现命令行执行动态传入参数

3. 具体步骤

  1. 新建Maven项目YarnDemo,pom.xml如下:
xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.jack.hadoop</groupId>
    <artifactId>yarn_tool_test</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.3.6</version>
        </dependency>
    </dependencies>
</project>
  1. 创建类WordCount并实现Tool接口
java
public class WordCount implements Tool {

    private Configuration conf;
    @Override
    public int run(String[] args) throws Exception {

        Job job = Job.getInstance(conf);
        job.setJarByClass(WordCountDriver.class);
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        return job.waitForCompletion(true) ? 0 : 1;
    }

    @Override
    public void setConf(Configuration conf) {
        this.conf = conf;
    }

    @Override
    public Configuration getConf() {
        return conf;
    }

    public static class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
        private Text outK = new Text();
        private IntWritable outV = new IntWritable(1);
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

            String line = value.toString();
            String[] words = line.split(" ");
            for (String word : words) {
                outK.set(word);
                context.write(outK, outV);
            }
        }
    }

    public static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable outV = new IntWritable();

        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable value : values) {
                sum += value.get();
            }
            outV.set(sum);
            context.write(key, outV);
        }
    }
}
  1. 创建WordCountDriver类
java
public class WordCountDriver {
    private static Tool tool;

    public static void main(String[] args) throws Exception {
        // 1. 创建配置文件
        Configuration conf = new Configuration();
        // 2. 判断是否有tool接口
        switch (args[0]){
            case "wordcount":
                tool = new WordCount();
                break;
            default:
                throw new RuntimeException(" No such tool: "+ args[0] );
        }
        // 3. 用Tool执行程序
        // Arrays.copyOfRange 将老数组的元素放到新数组里面
        int run = ToolRunner.run(conf, tool, Arrays.copyOfRange(args, 1, args.length));
        System.exit(run);
    }
}
  1. 在HDFS上准备输入文件,假设为/input目录,向集群提交该Jar包
sh
[jack@hadoop102 hadoop-3.3.6]$ yarn jar YarnDemo.jar com.jack.yarn.WordCountDriver wordcount /input /output

提示

此时提交的3个参数,第一个用于生成特定的Tool,第二个和第三个为输入输出目录。此时如果我们希望加入设置参数,可以在wordcount后面添加参数,例如: /input /output