Skip to content

MapReduce代码编程

1. Hadoop中数据类型和Java对应关系

Java类型Hadoop Writable类型
BooleanBooleanWritable
ByteByteWritable
IntIntWritable
FloatFloatWritable
LongLongWritable
DoubleDoubleWritable
StringText
MapMapWritable
ArrayArrayWritable
NullNullWritable

2. MapReduce编程规范

用户编写的程序分成三个部分:MapperReducerDriver

2.1 Mapper阶段

  1. 用户自定义的Mapper要继承自己的父类
  2. Mapper的输入数据是KV对的形式(KV的类型可自定义)
  3. Mapper中的业务逻辑写在map()方法中
  4. Mapper的输出数据是KV对的形式(KV的类型可自定义)
  5. map()方法(MapTask进程)对每一个<K,V>调用一次

2.2 Reducer阶段

  1. 用户自定义的Reducer要继承自己的父类
  2. Reducer的输入数据类型对应Mapper的输出数据类型,也是KV
  3. Reducer的业务逻辑写在reduce()方法中
  4. ReduceTask进程对每一组相同k的<k,v>组调用一次reduce()方法

2.3 Driver阶段

相当于YARN集群的客户端,用于提交我们整个程序到YARN集群,提交的是封装了MapReduce程序相关运行参数的job对象

3. 实操WorkCount

  1. pom.xml文件
xml
<properties>
    <maven.compiler.source>11</maven.compiler.source>
    <maven.compiler.target>11</maven.compiler.target>
    <maven.compiler.compilerVersion>11</maven.compiler.compilerVersion>
</properties>

<dependencies>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>3.3.6</version>
    </dependency>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>1.7.36</version>
    </dependency>
    <!--用于中文分词-->
    <dependency>
        <groupId>org.truenewx</groupId>
        <artifactId>ik-analyzer-core</artifactId>
        <version>5.0.1</version>
    </dependency>
</dependencies>
  1. 在项目的src/main/resources目录下,新建文件log4j.properties:
ini
log4j.rootLogger=INFO, stdout  
log4j.appender.stdout=org.apache.log4j.ConsoleAppender  
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout  
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n  
log4j.appender.logfile=org.apache.log4j.FileAppender  
log4j.appender.logfile.File=target/spring.log  
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout  
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
  1. 编写代码程序
java
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();

@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
    // 1 获取一行,转换成分词器需要的reader对象
    ByteArrayInputStream bai = new ByteArrayInputStream(value.getBytes());
    Reader read = new InputStreamReader(bai);
    // 2 切割
    IKSegmenter iks = new IKSegmenter(read,true);
    Lexeme t;
    while ((t = iks.next()) != null)
    {
        // 3 输出
        word.set(t.getLexemeText());
        context.write(word, one);
    }
}
java
IntWritable result = new IntWritable(0);
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
    int sum = 0;
    // 1 累加求和
    for (IntWritable value : values) {
        sum += value.get();
    }
    // 2 输出
    result.set(sum);
    context.write(key, result);
}
java
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
    // 1 获取配置信息以及获取job对象
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf);
    job.setJobName("中文计数");
    // 2 关联本Driver程序的jar
    job.setJarByClass(WordCountDriver.class);
    // 3 关联Mapper和Reducer的jar
    job.setMapperClass(WordCountMapper.class);
    job.setReducerClass(WordCountReducer.class);
    // 4 设置Mapper输出的kv类型
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(IntWritable.class);
    // 5 设置最终输出kv类型
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    // 6 设置输入和输出路径
    FileInputFormat.addInputPath(job, new Path("C:\\Users\\mi\\Downloads\\斗破苍穹.txt"));
    FileOutputFormat.setOutputPath(job, new Path("C:\\Users\\mi\\Downloads\\output\\"));
    // 7 提交job
    boolean result = job.waitForCompletion(true);
    System.exit(result ? 1 : 0);
}
  1. 本地运行测试成功,打包上传集群环境 pom.xml添加配置
xml
<build>
    <plugins>
        <plugin>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.6.1</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>
        <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

执行maven打包后上传Hadoop服务器:

sh
[jack@hadoop102 hadoop-3.3.6]$ hadoop jar  wc.jar com.rocket.mapreduce.WordCountDriver /user/jack/input /user/jack/output