OutputFormat数据输出
1. OutputFormat接口实现类
OutputFormat是MapReduce输出的基类,所有实现MapReduce输出都实现了 OutputFormat接口。下面我们介绍几种常见的OutputFormat实现类。 其中Reduce默认的输出是TextOutputFormat
2.自定义OutputFormat
2.1 应用场景
例如:输出数据到MySQL/HBase/Elasticsearch等存储框架中。
2.2 自定义OutputFormat步骤
- 自定义一个类继承FileOutputFormat。
- 继承RecordWriter,具体改写输出数据的方法write()。
3. 代码举例
需求:过滤输入的服务器log日志,请求本地0.0.0.0的软件日志输出到e:\output\local.log,不包含jack的网站输出到e:\output\other.log。
java
public class LogDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
//1 获取job对象
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "logFile");
job.setJarByClass(LogDriver.class);
job.setMapperClass(LogMapper.class);
job.setReducerClass(LogReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
// 设置自定义的OutputFormat
job.setOutputFormatClass(LogFileOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path("C:\\Users\\mi\\Desktop\\360TcpView.txt"));
FileOutputFormat.setOutputPath(job, new Path("E:\\output39"));
boolean b = job.waitForCompletion(true);
System.out.println(b ? 0 : 1);
}
}
java
public class LogFileOutputFormat extends FileOutputFormat<Text, Text> {
@Override
public RecordWriter<Text, Text> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
RecordWriter recordWriter = new RecordWriter<Text, Text>() {
FSDataOutputStream localLog;
FSDataOutputStream otherLog;
{
FileSystem fileSystem = FileSystem.get(job.getConfiguration());
localLog = fileSystem.create(new Path("E:\\output1\\000.log"));
otherLog = fileSystem.create(new Path("E:\\output1\\other.log"));
}
@Override
public void write(Text key, Text value) throws IOException, InterruptedException {
if(key.toString().contains("0.0.0.0")){
localLog.write((value.toString()+"\n").getBytes("utf8"));
}else{
otherLog.write((value.toString()+"\n").getBytes("utf8"));
}
}
@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
localLog.close();
otherLog.close();
}
};
return recordWriter;
}
}
java
public class LogMapper extends Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException {
String line = value.toString();
if (line.length() > 3) {
String[] split = line.split(" ");
String s = split[3].trim();
context.write(new Text(s), value);
}
}
}
java
public class LogReducer extends Reducer<Text, Text, Text, Text> {
@Override
protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException {
values.forEach(item ->{
try {
context.write(key, item);
} catch (IOException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
}
}
运行结果: