DataJoin 关联 逻辑代码

/**
   * Perform the actual join recursively.
   * 
   * @param tags
   *          a list of input tags
   * @param values
   *          a list of value lists, each corresponding to one input source
   * @param pos
   *          indicating the next value list to be joined
   * @param partialList
   *          a list of values, each from one value list considered so far.
   * @param key
   * @param output
   * @throws IOException
   */
  private void joinAndCollect(Object[] tags, ResetableIterator[] values,
                              int pos, Object[] partialList, Object key,
                              OutputCollector output, Reporter reporter) throws IOException {

    if (values.length == pos) {
      // get a value from each source. Combine them
      TaggedMapOutput combined = combine(tags, partialList);
      collect(key, combined, output, reporter);
      return;
    }
    ResetableIterator nextValues = values[pos];
    nextValues.reset();
    while (nextValues.hasNext()) {
      Object v = nextValues.next();
      partialList[pos] = v;
      joinAndCollect(tags, values, pos + 1, partialList, key, output, reporter);
    }
  }

tags 为join操做的数据源个数,例如ide

客户数据:spa

customer ID       Name      PhomeNumbercode

1                        赵一        025-5455-566orm

2                        钱二        025-4587-565get

3                        孙三        021-5845-5875input

客户的订单号:it

Customer ID     order ID     Price    Dataio

2                          1               93       2008-01-08table

3                          2               43       2012-01-21form

1                          3               43       2012-05-12

2                          4               32       2012-5-14

tags  为2,partialList[ ]存放的是join 匹配到的2个数据源的数据如

partialList[0] 为      2    钱二        025-4587-565

partialList[1] 为       2      1         93       2008-01-08

须要本身实现的方法

/**
   * 
   * @param tags
   *          a list of source tags
   * @param values
   *          a value per source
   * @return combined value derived from values of the sources
   */
  protected abstract TaggedMapOutput combine(Object[] tags, Object[] values);

就是 TaggedMapOutput combined = combine(tags, partialList);  

对join的数据进行处理

相关文章
相关标签/搜索