MapReduce任务执行流程

MapReduce执行流程

一个MapReduce程序的执行流程,大体如下图所示:
从这张图中,可以看出mapreduce程序,会经过数据的输入→map→shuffle→reduce→数据输出,这几个阶段。

 

需求

下面,再用代码来表示一下。现在有一份数据,我们来统计一下这个数据中一共有多少行。(通过代码了解执行流程)
11,iphone7,7000
11,iphone6,6000
11,iphone5,5000
22,ipad4,4000
22,ipad3,3000
22,ipad2,2000
22,ipad1,1000
33,macbook pro,15000
33,mac,14000
33,macbook air,13000

MapReduce代码

package com.me;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import java.io.IOException;

public class WordCount {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        conf.addResource("core-site.xml");
        conf.addResource("hdfs-site.xml");
        conf.set("fs.defaultFS", "hdfs://allin:9000");

        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (otherArgs.length != 2) {
            System.out.println("Usage: hadoop jar xxxx <in> <out>");
        }
        Job job = Job.getInstance(conf);
        job.setJobName("LineCount");
        job.setJarByClass(WordCount.class);

        //初始化map
        job.setMapperClass(MyFirstMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        //shuffle
        job.setCombinerClass(MyFirstReducer.class);

        //初始化reducer
        job.setReducerClass(MyFirstReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //输入及输出路径
        Path in  = new Path(otherArgs[0]);
        Path out = new Path(otherArgs[1]);
        FileInputFormat.addInputPath(job, in);
        FileOutputFormat.setOutputPath(job, out);

        System.exit(job.waitForCompletion( true) ? 0 : 1);
    }

    public static class MyFirstMapper extends Mapper<Object, Text, Text, IntWritable> {

        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            System.err.println("调用map的setup方法");
        }

        public static Text defaultKey = new Text("abc");
        public static IntWritable ONE = new IntWritable(1);
        @Override
        protected void map(Object key, Text value, Context context)
                throws IOException, InterruptedException {
            System.err.println("调用map的map方法");
            context.write(defaultKey, ONE);
        }

        @Override
        protected void cleanup(Context context) throws IOException, InterruptedException {
            System.err.println("调用map的cleanup方法");
        }
    }

    public static class MyFirstReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        public static IntWritable result = new IntWritable();

        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            System.err.println("调用reduce的setup方法");
        }

        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context)
                throws IOException, InterruptedException {
            System.err.println("调用reduce的reduce方法");
            int sum = 0;
            for (IntWritable value : values) {
                sum += value.get();
            }
            result.set(sum);
            context.write(key, result);
        }

        @Override
        protected void cleanup(Context context) throws IOException, InterruptedException {
            System.err.println("调用reduce的cleanup方法");
        }
    }
}

 

执行程序

设置jar的参数,即输入输出路径为
hdfs://allin:9000/test/all3c  hdfs://allin:9000/test/out/1

程序输出内容

18/02/12 10:52:00 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
18/02/12 10:52:00 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
18/02/12 10:52:00 WARN mapreduce.JobSubmitter: No job jar file set.  User classes may not be found. See Job or Job#setJar(String).
18/02/12 10:52:00 INFO input.FileInputFormat: Total input paths to process : 1
18/02/12 10:52:00 INFO mapreduce.JobSubmitter: number of splits:1
18/02/12 10:52:00 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local21430329_0001
18/02/12 10:52:00 INFO mapreduce.Job: The url to track the job: http://localhost:8080/
18/02/12 10:52:00 INFO mapreduce.Job: Running job: job_local21430329_0001
18/02/12 10:52:00 INFO mapred.LocalJobRunner: OutputCommitter set in config null
18/02/12 10:52:00 INFO mapred.LocalJobRunner: OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
18/02/12 10:52:00 INFO mapred.LocalJobRunner: Waiting for map tasks
18/02/12 10:52:00 INFO mapred.LocalJobRunner: Starting task: attempt_local21430329_0001_m_000000_0
18/02/12 10:52:00 INFO util.ProcfsBasedProcessTree: ProcfsBasedProcessTree currently is supported only on Linux.
18/02/12 10:52:00 INFO mapred.Task:  Using ResourceCalculatorProcessTree : org.apache.hadoop.yarn.util.WindowsBasedProcessTree@4fd02b85
18/02/12 10:52:00 INFO mapred.MapTask: Processing split: hdfs://allin:9000/test/all3c:0+159
18/02/12 10:52:00 INFO mapred.MapTask: (EQUATOR) 0 kvi 26214396(104857584)
18/02/12 10:52:00 INFO mapred.MapTask: mapreduce.task.io.sort.mb: 100
18/02/12 10:52:00 INFO mapred.MapTask: soft limit at 83886080
18/02/12 10:52:00 INFO mapred.MapTask: bufstart = 0; bufvoid = 104857600
18/02/12 10:52:00 INFO mapred.MapTask: kvstart = 26214396; length = 6553600
18/02/12 10:52:00 INFO mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
调用map的setup方法
调用map的map方法
调用map的map方法
调用map的map方法
调用map的map方法
调用map的map方法
调用map的map方法
调用map的map方法
调用map的map方法
调用map的map方法
调用map的map方法
调用map的cleanup方法
18/02/12 10:52:01 INFO mapred.LocalJobRunner:
18/02/12 10:52:01 INFO mapred.MapTask: Starting flush of map output
18/02/12 10:52:01 INFO mapred.MapTask: Spilling map output
18/02/12 10:52:01 INFO mapred.MapTask: bufstart = 0; bufend = 80; bufvoid = 104857600
18/02/12 10:52:01 INFO mapred.MapTask: kvstart = 26214396(104857584); kvend = 26214360(104857440); length = 37/6553600
调用reduce的setup方法
调用reduce的reduce方法
调用reduce的cleanup方法
18/02/12 10:52:01 INFO mapred.MapTask: Finished spill 0
18/02/12 10:52:01 INFO mapred.Task: Task:attempt_local21430329_0001_m_000000_0 is done. And is in the process of committing
18/02/12 10:52:01 INFO mapred.LocalJobRunner: map
18/02/12 10:52:01 INFO mapred.Task: Task 'attempt_local21430329_0001_m_000000_0' done.
18/02/12 10:52:01 INFO mapred.LocalJobRunner: Finishing task: attempt_local21430329_0001_m_000000_0
18/02/12 10:52:01 INFO mapred.LocalJobRunner: map task executor complete.
18/02/12 10:52:01 INFO mapred.LocalJobRunner: Waiting for reduce tasks
18/02/12 10:52:01 INFO mapred.LocalJobRunner: Starting task: attempt_local21430329_0001_r_000000_0
18/02/12 10:52:01 INFO util.ProcfsBasedProcessTree: ProcfsBasedProcessTree currently is supported only on Linux.
18/02/12 10:52:01 INFO mapred.Task:  Using ResourceCalculatorProcessTree : org.apache.hadoop.yarn.util.WindowsBasedProcessTree@42a537a5
18/02/12 10:52:01 INFO mapred.ReduceTask: Using ShuffleConsumerPlugin: org.apache.hadoop.mapreduce.task.reduce.Shuffle@56a3a097
18/02/12 10:52:01 INFO reduce.MergeManagerImpl: MergerManager: memoryLimit=1503238528, maxSingleShuffleLimit=375809632, mergeThreshold=992137472, ioSortFactor=10, memToMemMergeOutputsThreshold=10
18/02/12 10:52:01 INFO reduce.EventFetcher: attempt_local21430329_0001_r_000000_0 Thread started: EventFetcher for fetching Map Completion Events
18/02/12 10:52:01 INFO reduce.LocalFetcher: localfetcher#1 about to shuffle output of map attempt_local21430329_0001_m_000000_0 decomp: 12 len: 16 to MEMORY
18/02/12 10:52:01 INFO reduce.InMemoryMapOutput: Read 12 bytes from map-output for attempt_local21430329_0001_m_000000_0
18/02/12 10:52:01 INFO reduce.MergeManagerImpl: closeInMemoryFile -> map-output of size: 12, inMemoryMapOutputs.size() -> 1, commitMemory -> 0, usedMemory ->12
18/02/12 10:52:01 INFO reduce.EventFetcher: EventFetcher is interrupted.. Returning
18/02/12 10:52:01 INFO mapred.LocalJobRunner: 1 / 1 copied.
18/02/12 10:52:01 INFO reduce.MergeManagerImpl: finalMerge called with 1 in-memory map-outputs and 0 on-disk map-outputs
18/02/12 10:52:01 INFO mapred.Merger: Merging 1 sorted segments
18/02/12 10:52:01 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 6 bytes
18/02/12 10:52:01 INFO reduce.MergeManagerImpl: Merged 1 segments, 12 bytes to disk to satisfy reduce memory limit
18/02/12 10:52:01 INFO reduce.MergeManagerImpl: Merging 1 files, 16 bytes from disk
18/02/12 10:52:01 INFO reduce.MergeManagerImpl: Merging 0 segments, 0 bytes from memory into reduce
18/02/12 10:52:01 INFO mapred.Merger: Merging 1 sorted segments
18/02/12 10:52:01 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 6 bytes
18/02/12 10:52:01 INFO mapred.LocalJobRunner: 1 / 1 copied.
18/02/12 10:52:01 INFO Configuration.deprecation: mapred.skip.on is deprecated. Instead, use mapreduce.job.skiprecords
调用reduce的setup方法
调用reduce的reduce方法
调用reduce的cleanup方法
18/02/12 10:52:01 INFO mapred.Task: Task:attempt_local21430329_0001_r_000000_0 is done. And is in the process of committing
18/02/12 10:52:01 INFO mapred.LocalJobRunner: 1 / 1 copied.
18/02/12 10:52:01 INFO mapred.Task: Task attempt_local21430329_0001_r_000000_0 is allowed to commit now
18/02/12 10:52:01 INFO output.FileOutputCommitter: Saved output of task 'attempt_local21430329_0001_r_000000_0' to hdfs://allin:9000/test/out/1/_temporary/0/task_local21430329_0001_r_000000
18/02/12 10:52:01 INFO mapred.LocalJobRunner: reduce > reduce
18/02/12 10:52:01 INFO mapred.Task: Task 'attempt_local21430329_0001_r_000000_0' done.
18/02/12 10:52:01 INFO mapred.LocalJobRunner: Finishing task: attempt_local21430329_0001_r_000000_0
18/02/12 10:52:01 INFO mapred.LocalJobRunner: reduce task executor complete.
18/02/12 10:52:01 INFO mapreduce.Job: Job job_local21430329_0001 running in uber mode : false
18/02/12 10:52:01 INFO mapreduce.Job:  map 100% reduce 100%
18/02/12 10:52:01 INFO mapreduce.Job: Job job_local21430329_0001 completed successfully
18/02/12 10:52:01 INFO mapreduce.Job: Counters: 38
File System Counters
FILE: Number of bytes read=352
FILE: Number of bytes written=508236
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=318
HDFS: Number of bytes written=7
HDFS: Number of read operations=13
HDFS: Number of large read operations=0
HDFS: Number of write operations=6
Map-Reduce Framework
Map input records=10
Map output records=10
Map output bytes=80
Map output materialized bytes=16
Input split bytes=93
Combine input records=10
Combine output records=1
Reduce input groups=1
Reduce shuffle bytes=16
Reduce input records=1
Reduce output records=1
Spilled Records=2
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=6
CPU time spent (ms)=0
Physical memory (bytes) snapshot=0
Virtual memory (bytes) snapshot=0
Total committed heap usage (bytes)=511705088
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=159
File Output Format Counters
Bytes Written=7

Process finished with exit code 0

解析

通过上面的程序,可以看出map及reduce部分,重写方法后各个方法的执行顺序。

 

 

发表评论

电子邮件地址不会被公开。 必填项已用*标注