Skip to content

OutputFormat数据输出

1. OutputFormat接口实现类

OutputFormat是MapReduce输出的基类,所有实现MapReduce输出都实现了 OutputFormat接口。下面我们介绍几种常见的OutputFormat实现类。 Alt text 其中Reduce默认的输出是TextOutputFormat

2.自定义OutputFormat

2.1 应用场景

例如:输出数据到MySQL/HBase/Elasticsearch等存储框架中。

2.2 自定义OutputFormat步骤

  1. 自定义一个类继承FileOutputFormat。
  2. 继承RecordWriter,具体改写输出数据的方法write()。

3. 代码举例

需求:过滤输入的服务器log日志,请求本地0.0.0.0的软件日志输出到e:\output\local.log,不包含jack的网站输出到e:\output\other.log。

java
public class LogDriver {
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        //1 获取job对象
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "logFile");
        job.setJarByClass(LogDriver.class);
        job.setMapperClass(LogMapper.class);
        job.setReducerClass(LogReducer.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        // 设置自定义的OutputFormat
        job.setOutputFormatClass(LogFileOutputFormat.class);
        FileInputFormat.setInputPaths(job, new Path("C:\\Users\\mi\\Desktop\\360TcpView.txt"));
        FileOutputFormat.setOutputPath(job, new Path("E:\\output39"));
        boolean b = job.waitForCompletion(true);
        System.out.println(b ? 0 : 1);
    }
}
java
public class LogFileOutputFormat extends FileOutputFormat<Text, Text> {
    @Override
    public RecordWriter<Text, Text> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
        RecordWriter recordWriter = new RecordWriter<Text, Text>() {

            FSDataOutputStream localLog;
            FSDataOutputStream otherLog;

            {
                FileSystem fileSystem = FileSystem.get(job.getConfiguration());
                localLog = fileSystem.create(new Path("E:\\output1\\000.log"));
                otherLog = fileSystem.create(new Path("E:\\output1\\other.log"));
            }

            @Override
            public void write(Text key, Text value) throws IOException, InterruptedException {
                if(key.toString().contains("0.0.0.0")){
                    localLog.write((value.toString()+"\n").getBytes("utf8"));
                }else{
                    otherLog.write((value.toString()+"\n").getBytes("utf8"));
                }
            }

            @Override
            public void close(TaskAttemptContext context) throws IOException, InterruptedException {
                localLog.close();
                otherLog.close();
            }
        };
        return recordWriter;
    }
}
java
public class LogMapper extends Mapper<LongWritable, Text, Text, Text> {
    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException {
        String line = value.toString();
        if (line.length() > 3) {
            String[] split = line.split(" ");
            String s = split[3].trim();
            context.write(new Text(s), value);
        }
    }
}
java
public class LogReducer extends Reducer<Text, Text, Text, Text> {
    @Override
    protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException {
        values.forEach(item ->{
            try {
                context.write(key, item);
            } catch (IOException e) {
                throw new RuntimeException(e);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        });
    }
}

运行结果: Alt text