hadoop全局静态变量

开发MapReduce过程中,经常忽略了一个知识点。那就是全局变量的用法。
从代码方面看下Map任务执行过程(Reduce也类似)
通过上面这段代码,可以看出:
setup,执行一次,初始化操作
map,无限次调用,对每一行数据进行处理。
cleanup,执行一次,清理操作。
根据如上结构,要传递某个数据,我们可以这样搞:
在job的设置过程中,配置configuration对象,并设置参数和值。在这个环节,把数据传递进去(通常这种数据量,比较小)。然后在map或reduce过程中,再取出来。例如:
首先,添加变量值
Configuration conf = new Configuration();
p2 = "bbbbbb";
conf.set("param", p2);
然后,在setup函数中取变量值
public static String p4 = "";
@Override
protected void setup(Context context) throws IOException, InterruptedException {
    p4 = context.getConfiguration().get("param");
}
举个例子
现有一个文本文件,内容有10行,内容为:
<configuration>
    <property>
        <name>fs.defaultFS</name>
        <value>hdfs://allin:9000</value>
    </property>
    <property>
        <name>hadoop.tmp.dir</name>
        <value>/data/tmp/hadoop/tmp</value>
    </property>
</configuration>
写MR代码:
package com.param;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class RunMR {

    public static String p1 = "aa";
    public static String p2= "bb";

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS","hdfs://allin:9000");
        p1 = "aaaaaa";
        p2 = "bbbbbb";
        conf.set("param", p2);

        Job job = Job.getInstance(conf);
        job.setJobName("TestStaticParams");
        job.setJarByClass(RunMR.class);

        job.setMapperClass(RunMap.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        job.setReducerClass(RunReduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        String inString = "/test/3c";
        String outString = "/test/out/1";
        Path in = new Path(inString);
        Path out = new Path(outString);
        FileInputFormat.addInputPath(job, in);
        FileOutputFormat.setOutputPath(job, out);
        System.exit(job.waitForCompletion( true ) ? 1 : 0 );
    }

    public static class RunMap extends Mapper<Object, Text, Text, Text> {
        public static String p3 = "cc";
        public static String p4 = "";
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            p4 = context.getConfiguration().get("param");
        }

        @Override
        public void run(Context context) throws IOException, InterruptedException {
            super.run(context);
        }

        @Override
        protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String tmp = p1 + "_" + p2 + "_" + p3 + "_" + p4;
            System.err.println(tmp);
            context.write(new Text(tmp), new Text(""));
        }

        @Override
        protected void cleanup(Context context) throws IOException, InterruptedException {

        }
    }
    
    public static class RunReduce extends Reducer<Text, Text, Text, Text>{
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {

        }

        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            for (Text tt:values ) {
                context.write(key, tt);
            }
        }

        @Override
        protected void cleanup(Context context) throws IOException, InterruptedException {

        }
    }
}

 

接下来,分析下,如果执行此程序,会有什么样的输出结果。

快想~~~~~~~~~~~
执行程序:
我们把程序打包,打包后文件名 为daniu.jar。
一种是,使用java -jar daniu.jar来执行;
另一种是,使用hadoop jar daniu.jar来执行;
输出结果是不一样的。
java -jar的输出:
hadoop jar的输出:
解析:
不同的task共享的只是jar文件的初始版本,且分别运行在独立的JVM中,所以不同的task对Mapper(Reducer)实现类中成员变量以及主类中的成员变量的修改相互没有影响。
所以,此时要考虑,任务处理过程中,使用的数据,如何在Map及Reduce阶段进行传递。
在MapReduce中,全局共享数据的处理办法,通常有四种:
第一种:读写HDFS文件
主要使用了java的API,预先定义数据的存储规则,通过读写HDFS中指定的文件按照预先定义的规则访问数据即可实现全局访问
第二种:配置Job属性
在job的设置过程中,通常会配置configuration对象,并设置一些参数。所以我们可以在这个环节,把数据传递进去。通常这种数据量,比较小。
然后在map或reduce过程中,再取出来。例如:
添加变量值:
Configuration conf = new Configuration();
p2 = "bbbbbb";
conf.set("param", p2);
在setup函数中取变量值:
public static String p4 = "";
@Override
protected void setup(Context context) throws IOException, InterruptedException {
    p4 = context.getConfiguration().get("param");
}
第三种:使用DistributedCache
第四种:直接将数据写到kv中
其实要传递某个数据,有时我们可以将数据直接放到key或value中,进行 传递。

 

 

发表评论

电子邮件地址不会被公开。 必填项已用*标注