hadoop为序列化提供了优化,类型的比较对M/R而言相当重要,Key和Key的比较也是在排序阶段完成的,hadoop提供了原生的比较器接口RawComparator<T>用于序列化字节间的比较,该接口容许其实现直接比较数据流中的记录,无需反序列化为对象,RawComparator是一个原生的优化接口类,它只是简单的提供了用于数据流中简单的数据对比方法,从而提供优化:java
public interface RawComparator<T> extends Comparator<T> { public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2); }
该类并不是被多数的衍生类所实现,其具体的子类为WritableComparator,多数状况下是做为实现Writable接口的类的内置类,提供序列化字节的比较。下面是RawComparator接口内置类的实现类图:数组
首先,咱们看 RawComparator的具体实现类WritableComparator:安全
WritableComparator类相似于一个注册表,里面记录了全部Comparator类的集合。函数
Comparators成员用一张Hash表记录Key=Class,value=WritableComprator的注册信息.oop
WritableComparator主要提供了两个功能优化
1. 提供了对原始compare()方法的一个默认实现this
默认实现是 先反序列化为对像 再经过 对像比较(有开销的问题)spa
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {线程
try {code
buffer.reset(b1, s1, l1); // parse key1
key1.readFields(buffer);
buffer.reset(b2, s2, l2); // parse key2
key2.readFields(buffer);
} catch (IOException e) {
throw new RuntimeException(e);
}
return compare(key1, key2); // compare them
}
而对应的基础数据类型的compare()的实现却巧妙的利用了特定类型的泛化:(利用了writableComparable的compareTo方法)
public int compare(WritableComparable a, WritableComparable b) {
return a.compareTo(b);
}
例如IntWritable实例是调用了IntWritable里的compareTo方法
public int compareTo(Object o) {
int thisValue = this.value;
int thatValue = ((IntWritable)o).value;
return (thisValue<thatValue ? -1 : (thisValue==thatValue ? 0 : 1));
}
2. 充当RawComparable实例的工厂,以注册Writable的实现
例如,为了获取IntWritable的Comparator,能够直接调用其get方法。
WritableComparator:
关键代码:
代码1:registry 注册器
----------------------------------------------------------------
// registry 注册器:记载了WritableComparator类的集合
private static HashMap<Class, WritableComparator>comparators =
new HashMap<Class, WritableComparator>();
代码2:获取WritableComparator实例
说明:hashMap做为容器类线程不安全,故须要synchronized同步,get方法根据key=Class返回对应的WritableComparator,若返回的是空值NUll,则调用protected Constructor进行构造,而其两个protected的构造函数实则是调用了newKey()方法进行NewInstance
public static synchronized WritableComparator get(Class<? extends WritableComparable> c) { WritableComparator comparator = comparators.get(c); if (comparator == null) comparator = new WritableComparator(c, true); return comparator; }
代码3:构造方法
---------------------------------------------------------------
new WritableComparator(c, true) WritableComparator的构造函数源码以下: /* * keyClass,key1,key2和buffer都是用于WritableComparator的构造函数 */ private final Class<? extends WritableComparable> keyClass; private final WritableComparable key1; //WritableComparable接口 private final WritableComparable key2; private final DataInputBuffer buffer; //输入缓冲流 protected WritableComparator(Class<? extends WritableComparable> keyClass, boolean createInstances) { this.keyClass = keyClass; if (createInstances) { key1 = newKey(); key2 = newKey(); buffer = new DataInputBuffer(); } else { key1 = key2 = null; buffer = null; } }
上述的keyClass,key1,key2,buffer是记录HashMap对应的key值,用于WritableComparator的构造函数,但由其构造函数中咱们能够看出WritableComparator根据Boolean createInstance来判断是否实例化key1,key2和buffer,而key1,key2做为实现了WritableComparable接口的标识,在WritableComparator的构造函数里面经过newKey()的方法去实例化实现WritableComparable接口的一个对象,下面是newKey()的源码,经过hadoop自身的反射去实例化了一个WritableComparable接口对象。
public WritableComparable newKey() { return ReflectionUtils.newInstance(keyClass, null); }
代码4:Compare()方法
---------------------------------------------------------------------
1. public int compare(Object a, Object b);
2. public int compare(WritableComparable a, WritableComparable b);
3. public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2);
三个compare()重载方法中,compare(Object a, Object b)利用子类塑形为WritableComparable而调用了第2个compare方法,而第2个Compare()方法则调用了Writable.compaerTo();最后一个compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2)方法源码以下:
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { try { buffer.reset(b1, s1, l1); // parse key1 key1.readFields(buffer); buffer.reset(b2, s2, l2); // parse key2 key2.readFields(buffer); } catch (IOException e) { throw new RuntimeException(e); } return compare(key1, key2); // compare them }
Compare方法的一个缺省实现方式,根据接口key1,ke2反序列化为对象再进行比较。
利用Buffer为桥接中介,把字节数组存储为buffer后,调用key1(WritableComparable)的反序列化方法,再来比较key1,ke2,由此处能够看出,该compare方法是将要比较的二进制流反序列化为对象,再调用方法第2个重载方法进行比较。
代码5:方法define方法
该方法用于注册WritebaleComparaor对象到注册表中,注意同时该方法也须要同步,代码以下:
public static synchronized void define(Class c, WritableComparator comparator) { comparators.put(c, comparator); }
代码5:余下诸如readInt的静态方法
---------------------------------------------------------------------
这些方法用于实现WritableComparable的各类实例,例如 IntWritable实例:内部类Comparator类须要根据本身的IntWritable类型重载WritableComparator里面的compare()方法,能够说WritableComparator里面的compare()方法只是提供了一个缺省的实现,而真正的compare()方法实现须要根据本身的类型如IntWritable进行重载,因此WritableComparator方法中的那些readInt..等方法只是底层的封装的一个实现,方便内部Comparator进行调用而已。
下面咱们着重看下BooleanWritable类的内置RawCompartor<T>的实现过程:
/** * A Comparator optimized for BooleanWritable. */ public static class Comparator extends WritableComparator { public Comparator() {//调用父类的Constructor初始化keyClass=BooleanWrite.class super(BooleanWritable.class); } //重写父类的序列化比较方法,用些类用到父类提供的缺省方法 public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { boolean a = (readInt(b1, s1) == 1) ? true : false; boolean b = (readInt(b2, s2) == 1) ? true : false; return ((a == b) ? 0 : (a == false) ? -1 : 1); } } //注册 static { WritableComparator.define(BooleanWritable.class, new Comparator()); }
总结:
hadoop 相似于Java的类包,即提供了Comparable接口(对应于writableComparable接口)和Comparator类(对应于RawComparator类)用于实现序列化的比较,在hadoop 的IO包中已经封装了JAVA的基本数据类型用于序列化和反序列化,通常本身写的类实现序列化和反序列化须要继承WritableComparable接口而且内置一个Comparator(继承于WritableComparator)的格式来实现本身的对象。