序列化是将结构化对象为字节流以便与经过网络进行传输或者写入持久存储。反序列化指的是将字节流转为一系列结构化对象的过程。 java
序化在分布式数据处理的两列大领域常常出现:进程间通讯和永久存储 apache
hadoop中,节点直接的进程间通讯是用远程过程调用(RPC)实现的。RPC协议将消息序列化成二进制流后发送到运城节点,远程节点接着将二进制流反序列化为原始的消息。 数组
在Hadoop中,Writable接口定义了两个方法:一个用于将其状态写入二进制格式的DataOutput流,另外一个用于从二进制格式的DataInput流读取其态。 网络
packageorg.apache.hadoop.io; importjava.io.DataOutput; importjava.io.DataInput; importjava.io.IOException; public interface Writable { void write(DataOutput out)throws IOException; void readFields(DataInput in)throws IOException;
write和readFields分别实现了把对象序列化和反序列化的功能 分布式
让咱们来看一个特别的Writable,看看能够对它进行哪些操做。咱们要使用IntWritable,这是一个Java的int对象的封装。可使用set()函数来建立和设置它的值: 函数
IntWritable writable =new IntWritable(); writable.set(163);
IntWritable writable =newIntWritable(163);
public static byte[] serialize(Writable writable)throws IOException { ByteArrayOutputStream out =new ByteArrayOutputStream(); DataOutputStream dataOut =new DataOutputStream(out); writable.write(dataOut); dataOut.close(); returnout.toByteArray(); }
byte[] bytes = serialize(writable); assertThat(bytes.length, is(4));
assertThat(StringUtils.byteToHexString(bytes), is("000000a3"));
让咱们再来试试反序列化。咱们建立一个帮助方法来从一个字节数组读取一个Writable对象: oop
public static byte[] deserialize(Writable writable,byte[] bytes) throws IOException { ByteArrayInputStream in =new ByteArrayInputStream(bytes); DataInputStream dataIn =new DataInputStream(in); writable.readFields(dataIn); dataIn.close(); return bytes; }咱们构造一个新的、缺值的IntWritable,而后调用deserialize()方法来读取刚写入的输出流。而后发现它的值(使用get方法检索获得)仍是原来的值163:
IntWritable newWritable =new IntWritable(); deserialize(newWritable, bytes); assertThat(newWritable.get(), is(163));
IntWritable实现了WritableComparable接口,后者是Writable和java.lang.Comparable接口的子接口。 优化
packageorg.apache.hadoop.io; public interface WritableComparable<t> extends Writable, Comparable<t> { }
类型的比较对MapReduce而言相当重要的,键和键之间的比较是在排序阶段完成。Hadoop提供的一个优化方法是从Java Comparator的RawComparator扩展 spa
packageorg.apache.hadoop.io; importjava.util.Comparator; public interface RawComparator<t> extends Comparator<t> { public int compare(byte[] b1,ints1,intl1,byte[] b2,ints2,intl2); }
package java.util; public interface Comparator<T> { int compare(T o1, T o2); boolean equals(Object obj); }这个接口容许执行者比较从流中读取的未被反序列化为对象的记录,从而省去了建立对象的全部开销。例如,IntWritables的comparator使用原始的compare()方法从每一个字节数组的指定开始位置(S1和S2)和长度(L1和L2)读取整数b1和b2而后直接进行比较。
WritableComparator是RawComparator对WritableComparable类的一个通用实现。它提供两个主要功能。首先,它提供了一个默认的对原始compare()函数的调用,对从数据流对要比较的对象进行反序列化,而后调用对象的compare()方法。其次,它充当的是RawComparator实例的一个工厂方法(Writable方法已经注册)。例如,为得到IntWritable的comparator,咱们只需使用: code
RawComparator<intwritable> comparator = WritableComparator.get(IntWritable.class);
WritableComparator get方法源码:
private static HashMap<Class, WritableComparator> comparators = new HashMap<Class, WritableComparator>(); // registry /** Get a comparator for a {@link WritableComparable} implementation. */ public static synchronized WritableComparator get(Class<? extends WritableComparable> c) { WritableComparator comparator = comparators.get(c); if (comparator == null) comparator = new WritableComparator(c, true); return comparator; }
comparator能够用来比较两个IntWritable:
IntWritable w1 =newIntWritable(163); IntWritable w2 =newIntWritable(67); assertThat(comparator.compare(w1, w2), greaterThan(0));
或者它们的序列化描述:
byte[] b1 = serialize(w1); byte[] b2 = serialize(w2); assertThat(comparator.compare(b1, 0, b1.length, b2, 0, b2.length), greaterThan(0));WritableComparator的compare()方法的源码:
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { try { buffer.reset(b1, s1, l1); // parse key1 key1.readFields(buffer); buffer.reset(b2, s2, l2); // parse key2 key2.readFields(buffer); } catch (IOException e) { throw new RuntimeException(e); } return compare(key1, key2); // compare them } @SuppressWarnings("unchecked") public int compare(WritableComparable a, WritableComparable b) { return a.compareTo(b); }
参考:《hadoop权威指南》