将MapReduce计算结果,输出到多个文件或文件夹2

了解了MultipleOutputs类的基本用法,关于多路径输出可以有多种用法。

再举个例子

依然使用测试数据文件all3c,内容如下:

11,iphone7,7000

11,iphone6,6000

11,iphone5,5000

22,ipad4,4000

22,ipad3,3000

22,ipad2,2000

22,ipad1,1000

33,macbook pro,15000

33,mac,14000

33,macbook air,13000

数据主要由3列构成,分别表示类别编号,设备类型,价格。

这次要求,根据价格区间划分数据,即:

5000以下放到一个文件中;5000到10000放到一个文件中;10000以上的放到一个文件中。

 

也就是输出结果为:

 

再来看一下,三个输出文件中的数据

 

xiaoyu5000-r-00000

 

xiaoyu10000-r-00000

 

dayu10000-r-00000

 

那具体功能实现的代码,如下:

 

com.me;

org.apache.hadoop.conf.Configuration;
org.apache.hadoop.fs.FileSystem;
org.apache.hadoop.fs.Path;
org.apache.hadoop.io.IntWritable;
org.apache.hadoop.io.LongWritable;
org.apache.hadoop.io.NullWritable;
org.apache.hadoop.io.Text;
org.apache.hadoop.mapreduce.Job;
org.apache.hadoop.mapreduce.Mapper;
org.apache.hadoop.mapreduce.Reducer;
org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

java.io.IOException;

MultiOutputTest2 {
    main(String[] args) IOException, ClassNotFoundException, InterruptedException {
        Configuration configuration = Configuration();
        Job job = Job.(configuration);
        job.setJarByClass(MultiOutputTest2.);
        job.setMapperClass(MutiTestMapper.);
        job.setMapOutputKeyClass(IntWritable.);
        job.setMapOutputValueClass(Text.);

        job.setReducerClass(MutiTestReducer.);
        job.setOutputKeyClass(NullWritable.);
        job.setOutputValueClass(Text.);

        FileInputFormat.(job, Path(args[]));
        MultipleOutputs.(job, , TextOutputFormat.,NullWritable., Text.);
        MultipleOutputs.(job, , TextOutputFormat., NullWritable., Text.);
        MultipleOutputs.(job, , TextOutputFormat., NullWritable., Text.);

        LazyOutputFormat.(job, TextOutputFormat.);

        Path outputPath = Path(args[]);
        FileSystem fs = FileSystem.(configuration);
        (fs.exists(outputPath)){
            fs.delete(outputPath, );
        }
        FileOutputFormat.(job, outputPath);
        job.waitForCompletion();
    }

    MutiTestMapper Mapper<LongWritable, Text, IntWritable, Text>{
        map(LongWritable key, Text value, Context context) IOException, InterruptedException {
            String line = value.toString().trim();
            (!= line && != line.length()){
                String[] arr = line.split();
                context.write( IntWritable(Integer.(arr[])) , value );
            }
        }
    }

    MutiTestReducer Reducer<IntWritable, Text, NullWritable, Text>{

        MultipleOutputs<NullWritable, Text> = ;

        setup(Context context) IOException, InterruptedException {
            = MultipleOutputs<NullWritable, Text>(context);
        }

        reduce(IntWritable key, Iterable<Text> values, Context context) IOException, InterruptedException {
            (Text value : values ){
                (key.get() < ){
                    .write(, NullWritable.(), value);
                } (key.get() >= && key.get() <= ){
                    .write(, NullWritable.(), value);
                } {
                    .write(, NullWritable.(), value);
                }
            }
        }

        cleanup(Context context) IOException, InterruptedException {
            (!= ){
                .close();
                = ;
            }
        }
    }
}

 

说明:这里需要看一下map阶段的输出内容,因为要根据key进行分组操作,所以将价格作为key,整条记录作为值,来做处理。

 

Game Over !!!

 

 

 

发表评论

电子邮件地址不会被公开。 必填项已用*标注