/** * Perform the actual join recursively. * * @param tags * a list of input tags * @param values * a list of value lists, each corresponding to one input source * @param pos * indicating the next value list to be joined * @param partialList * a list of values, each from one value list considered so far. * @param key * @param output * @throws IOException */ private void joinAndCollect(Object[] tags, ResetableIterator[] values, int pos, Object[] partialList, Object key, OutputCollector output, Reporter reporter) throws IOException { if (values.length == pos) { // get a value from each source. Combine them TaggedMapOutput combined = combine(tags, partialList); collect(key, combined, output, reporter); return; } ResetableIterator nextValues = values[pos]; nextValues.reset(); while (nextValues.hasNext()) { Object v = nextValues.next(); partialList[pos] = v; joinAndCollect(tags, values, pos + 1, partialList, key, output, reporter); } }
tags 为join操做的数据源个数,例如ide
客户数据:spa
customer ID Name PhomeNumbercode
1 赵一 025-5455-566orm
2 钱二 025-4587-565get
3 孙三 021-5845-5875input
客户的订单号:it
Customer ID order ID Price Dataio
2 1 93 2008-01-08table
3 2 43 2012-01-21form
1 3 43 2012-05-12
2 4 32 2012-5-14
tags 为2,partialList[ ]存放的是join 匹配到的2个数据源的数据如
partialList[0] 为 2 钱二 025-4587-565
partialList[1] 为 2 1 93 2008-01-08
须要本身实现的方法
/** * * @param tags * a list of source tags * @param values * a value per source * @return combined value derived from values of the sources */ protected abstract TaggedMapOutput combine(Object[] tags, Object[] values);
就是 TaggedMapOutput combined = combine(tags, partialList);
对join的数据进行处理