博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
hadoop之定制自己的sort过程
阅读量:4952 次
发布时间:2019-06-12

本文共 6387 字,大约阅读时间需要 21 分钟。

Key排序

1. 继承WritableComparator

  在中,可以看到mapper的输出文件spill文件需要在内存中排序,并且在输入reducer之前,不同的mapper的数据也会排序,排序是根据数据的key进行的.

如果key是用户自定义的类型,并没有默认的比较函数时,就需要自己定义key的比较函数,也就是继承WritableComparator.事例代码如下:

public static class KeyComparator extends WritableComparator {    protected KeyComparator() {      super(IntPair.class, true);    }    @Override    public int compare(WritableComparable w1, WritableComparable w2) {      IntPair ip1 = (IntPair) w1;      IntPair ip2 = (IntPair) w2;      // 这里要注意的是,一定要在聚合参数相同的情况下,再比较另一个参数      // 这里是先比较年份,再比较温度,按温度降序排序      int cmp = IntPair.compare(ip1.getFirst(), ip2.getFirst());      if (cmp != 0) {        return cmp;      }      return -IntPair.compare(ip1.getSecond(), ip2.getSecond()); //reverse    }  }

例子中对IntPair定义了新的compare函数,并在main函数中通过下面的方式实现替换:

job.setSortComparatorClass(KeyComparator.class);

 2.实现 WritableComparable接口

看下面的例子代码:

static class  NewK2 implements WritableComparable
{ Long first; Long second; public NewK2(){} public NewK2(long first, long second){ this.first = first; this.second = second; } @Override public void readFields(DataInput in) throws IOException { this.first = in.readLong(); this.second = in.readLong(); } @Override public void write(DataOutput out) throws IOException { out.writeLong(first); out.writeLong(second); } /** * 当k2进行排序时,会调用该方法. * 当第一列不同时,升序;当第一列相同时,第二列升序 */ @Override public int compareTo(NewK2 o) { final long minus = this.first - o.first; if(minus !=0){ return (int)minus; } return (int)(this.second - o.second); } @Override public int hashCode() { return this.first.hashCode()+this.second.hashCode(); } @Override public boolean equals(Object obj) { if(!(obj instanceof NewK2)){ return false; } NewK2 oK2 = (NewK2)obj; return (this.first==oK2.first)&&(this.second==oK2.second); } }

 

如果是按照上述的例子实现的,不需要在main函数中设置其他的代码.

Group排序

   一般来说,如果用户自定义了key的排序过程,那么在reducer之前的对数据进行分组的过程就要重新编写,而且一般来说,partitioner也需要重新定义,请参考 .

 shuffle阶段,虽然使用的是hash的方法,我们并不能保证映射到同一个reducer的key的hash值都是一样的,对于不同的hash值要进行分群,然后再执行reduce.下面是自定义groupcomparator的例子:

public static class GroupComparator extends WritableComparator {      protected GroupComparator() {        super(IntPair.class, true);      }      @Override      public int compare(WritableComparable w1, WritableComparable w2) {        IntPair ip1 = (IntPair) w1;        IntPair ip2 = (IntPair) w2;      // 这里是按key的第一个参数来聚合,就是年份        return IntPair.compare(ip1.getFirst(), ip2.getFirst());      }    }

例子中实现了对于IntPair类型的分群比较函数的重新定义.在main函数中通过下面的方式进行调用:

job.setGroupingComparatorClass(GroupComparator.class);

二次排序

   下面是对地区温度进行的统计,要求输出各个年份的最大温度,例子中定制了自己的partitioner:FirstPartitioner来对组合后的类型进行分组,实际上还是按照年份进行的分组;定制了自己的keycomparator:KeyComparator,先比较年份,然后再比较温度;定制了自己的分群比较类:GroupComparator,也是按照年份进行分群,然后扔给reducer进行处理.

  值得一提的是,为什么不用传统的mapreduce,按照年份进行进行map,然后在reduce中,遍历每年不同的温度,找到最大值呢?原因之一就是效率的问题,sort操作本身就要在MP框架中执行,而且已经做了很多优化,通过设置比较的不同手段,很容易实现比较,然而在reducer处理中进行遍历,显然比上面的sort过程要慢.下面是例子的完整代码,摘自.

public class MaxTemperatureUsingSecondarySort extends Configured implements Tool {  // Map任务  static class MaxTemperatureMapper extends MapReduceBase implements Mapper
{ private NcdcRecordParser parser = new NcdcRecordParser(); public void map(LongWritable key, Text value, OutputCollector
output, Reporter reporter) throws IOException { parser.parse(value); // 解析输入的文本 if (parser.isValidTemperature()) { // 这里把年份与温度组合成一个key,value为空 output.collect(new IntPair(parser.getYearInt(),+ parser.getAirTemperature()), NullWritable.get()); } }}// Reduce任务static class MaxTemperatureReducer extends MapReduceBase implements Reducer
{ public void reduce(IntPair key, Iterator
values, OutputCollector
output, Reporter reporter) throws IOException { // 输出聚合的key值,这里的key是先按年份进行聚合,所我们会看到相同所有年份相同的key会聚合在一起,而这些聚合后的key按温度进行降序按列 // 所以聚合中第一个key为温度最高的,所以这里输出的key为这一年中温度最高的值 output.collect(key, NullWritable.get()); }}// 切分器,这里是按年份* 127 % reduceNum来进行切分的public static class FirstPartitioner implements Partitioner
{ @Override public void configure(JobConf job) {} @Override public int getPartition(IntPair key, NullWritable value, int numPartitions) { return Math.abs(key.getFirst() * 127) % numPartitions; }}// 聚合key的一个比较器public static class KeyComparator extends WritableComparator { protected KeyComparator() { super(IntPair.class, true); } @Override public int compare(WritableComparable w1, WritableComparable w2) { IntPair ip1 = (IntPair) w1; IntPair ip2 = (IntPair) w2; // 这里要注意的是,一定要在聚合参数相同的情况下,再比较另一个参数 // 这里是先比较年份,再比较温度,按温度降序排序 int cmp = IntPair.compare(ip1.getFirst(), ip2.getFirst()); if (cmp != 0) { return cmp; } return -IntPair.compare(ip1.getSecond(), ip2.getSecond()); //reverse }} // 设置聚合比较器 public static class GroupComparator extends WritableComparator { protected GroupComparator() { super(IntPair.class, true); } @Override public int compare(WritableComparable w1, WritableComparable w2) { IntPair ip1 = (IntPair) w1; IntPair ip2 = (IntPair) w2; // 这里是按key的第一个参数来聚合,就是年份 return IntPair.compare(ip1.getFirst(), ip2.getFirst()); } } @Override public int run(String[] args) throws IOException { Job job = JobBuilder.parseInputAndOutput(this, getConf(), args); if (job == null) { return -1; } job.setMapperClass(MaxTemperatureMapper.class); job.setPartitionerClass(FirstPartitioner.class); job.setSortComparatorClass(KeyComparator.class); job.setGroupingComparatorClass(GroupComparator.class); job.setReducerClass(MaxTemperatureReducer.class); job.setOutputKeyClass(IntPair.class); // 设置key的一个组合类型,如里这个类型实现了WritableComparable
的话,那就不要设置setOutputKeyComparatorClass了. job.setOutputValueClass(NullWritable.class); // 输出的value为NULL,因为这里的实际value已经组合到了key中 return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new MaxTemperatureUsingSecondarySort(), args); System.exit(exitCode); }}

转载于:https://www.cnblogs.com/wzyj/p/4693131.html

你可能感兴趣的文章
sqlite
查看>>
机电行业如何进行信息化建设
查看>>
Windows Azure Platform Introduction (4) Windows Azure架构
查看>>
【转】chrome developer tool 调试技巧
查看>>
mahout运行测试与kmeans算法解析
查看>>
互相给一巴掌器
查看>>
Android SDK环境变量配置
查看>>
VM10虚拟机安装图解
查看>>
9、总线
查看>>
Git 笔记 - section 1
查看>>
JZOJ 4.1 B组 俄罗斯方块
查看>>
HDU6409 没有兄弟的舞会
查看>>
2018 Multi-University Training Contest 10 - TeaTree
查看>>
HDU6205 card card card
查看>>
2018 Multi-University Training Contest 10 - Count
查看>>
HDU6198 number number number
查看>>
HDU6438 Buy and Resell
查看>>
HDU6446 Tree and Permutation
查看>>
HDU6201 transaction transaction transaction
查看>>
HDU6203 ping ping ping
查看>>