将MapReduce计算结果,输出到多个文件或文件夹1

场景描述:

在使用MapReduce处理数据时,经常会遇到一种数据输出的场景:根据key,或根据某个标记,将数据各自单独输出所属的目录或文件中。这样方便数据分析,比如根据某个时间段对日志文件进行时间段归类等等。这时候我们就可以使用MultipleOutputs类,来搞定这件事。

 

输出到多个文件或多个文件夹

驱动中不需要额外改变,只需要在MapClass或Reduce类中加入如下代码

private MultipleOutputs<Text,IntWritable> mos;

public void setup(Context context) throws IOException,InterruptedException {

mos = new MultipleOutputs(context);

}

public void cleanup(Context context) throws IOException,InterruptedException {

mos.close();

}

然后就可以用mos.write(Key key,Value value,String baseOutputPath)代替context.write(key, value); 

 

举个例子

测试数据文件all3c,内容如下:

11,iphone7,7000

11,iphone6,6000

11,iphone5,5000

22,ipad4,4000

22,ipad3,3000

22,ipad2,2000

22,ipad1,1000

33,macbook pro,15000

33,mac,14000

33,macbook air,13000

数据主要由3列构成,分别表示类别编号,设备类型,价格。

要求,根据类别编号划分文件,将编号为11、22、33的数据各放到一个文件中。也就是输出结果为:

11-r-00000

11,iphone7,7000

11,iphone6,6000

11,iphone5,5000

22-r-00000

22,ipad4,4000

22,ipad3,3000

22,ipad2,2000

22,ipad1,1000

33-r-00000

33,macbook pro,15000

33,mac,14000

33,macbook air,13000

首先准备好all3c这个文件,并将数据上传到hdfs上的/test目录下

 

编写MapReduce程序,处理数据

 

com.me;

org.apache.hadoop.conf.Configuration;
org.apache.hadoop.fs.FileSystem;
org.apache.hadoop.fs.Path;
org.apache.hadoop.io.IntWritable;
org.apache.hadoop.io.LongWritable;
org.apache.hadoop.io.NullWritable;
org.apache.hadoop.io.Text;
org.apache.hadoop.mapreduce.Job;
org.apache.hadoop.mapreduce.Mapper;
org.apache.hadoop.mapreduce.Reducer;
org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

java.io.IOException;

MultiOutputTest {
    main(String[] args) IOException, ClassNotFoundException, InterruptedException {
        Configuration configuration = Configuration();
        Job job = Job.(configuration);
        job.setJarByClass(MultiOutputTest.);
        job.setMapperClass(MutiTestMapper.);
        job.setMapOutputKeyClass(IntWritable.);
        job.setMapOutputValueClass(Text.);

        job.setReducerClass(MutiTestReducer.);
        job.setOutputKeyClass(NullWritable.);
        job.setOutputValueClass(Text.);

        FileInputFormat.(job, Path(args[]));
        MultipleOutputs.(job, , TextOutputFormat.,NullWritable., Text.);
        MultipleOutputs.(job, , TextOutputFormat., NullWritable., Text.);

        LazyOutputFormat.(job, TextOutputFormat.);

        Path outputPath = Path(args[]);
        FileSystem fs = FileSystem.(configuration);
        (fs.exists(outputPath)){
            fs.delete(outputPath, );
        }
        FileOutputFormat.(job, outputPath);
        job.waitForCompletion();
    }

    MutiTestMapper Mapper<LongWritable, Text, IntWritable, Text>{
        map(LongWritable key, Text value, Context context) IOException, InterruptedException {
            String line = value.toString().trim();
            (!= line && != line.length()){
                String[] arr = line.split();
                context.write( IntWritable(Integer.(arr[])) , value );
            }
        }
    }

    MutiTestReducer Reducer<IntWritable, Text, NullWritable, Text>{

        MultipleOutputs<NullWritable, Text> = ;

        setup(Context context) IOException, InterruptedException {
            = MultipleOutputs<NullWritable, Text>(context);
        }
        
        reduce(IntWritable key, Iterable<Text> values, Context context) IOException, InterruptedException {
            (Text value : values ){
                .write(, NullWritable.(), value, key.toString()++ key.toString());
                .write(, NullWritable.(), value);
            }
        }

        cleanup(Context context) IOException, InterruptedException {
            (!= ){
                .close();
                = ;
            }
        }
    }
}

 

输入参数,并执行

 

查看执行结果

 

在代码中,有两个输出,代替了context.write

mos.write(“first”, NullWritable.get(), value, key.toString()+”/” + key.toString());

mos.write(“second”, NullWritable.get(), value);

这里的first和second,是我们给输出打的标记。

second,直接对数据进行了输出,文件名为second-r-00000;

而first,并没有输出带有first的文件名,而是以key划分文件,并输出。

 

补充内容1

在Main函数中,有这样一句话

LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);

如果不添加这句话,在/test/out/1/下,会有一个内容为空的文件,名为part-r-00000,这个文件是没有意义的。

 

补充内容2

multipleOutputs.write(key, value, baseOutputPath)方法的第三个函数表明了该输出所在的目录(相对于用户指定的输出目录)。

如果baseOutputPath不包含文件分隔符“/”,那么输出的文件格式为baseOutputPath-r-nnnnn(name-r-nnnnn),即11-r-00000、22-r-00000、33-r-00000

如果包含文件分隔符“/”,例如baseOutputPath=key.tostring  + “/” ,那么输出文件则为11/-r-nnnnn、22/-r-nnnnn、33/-r-nnnnn

例如baseOutputPath=key.tostring  + “/” + key.tostring,那么输出文件则为11/11-r-nnnnn、22/22-r-nnnnn、33/33-r-nnnnn

 

补充内容3

在reduce阶段,需要先在setup方法中,设置多文件输出mos。

使用完毕后,需要在cleanup方法中,调用mos的close方法,否则最后的输出为空。

 

 

 

发表评论

电子邮件地址不会被公开。 必填项已用*标注