sqoop2导入数据,是基于mapreduce框架的。sqoop2会将数据分片的信息,而后把分片的信息传递给mapreduce框架,每个数据分片对应着一个map任务。java
MapreduceSubmissionEngine类,实现了job的提交。数据库
initialize方法,从指定的配置文件夹,读取job的配置,这里都是全局配置。app
而后设置与job相关的Driver, 源数据和目标数据的Connector, notification的配置。框架
最后指定Mapper,Input和Output相关的类信息。ide
sqoop2支持的数据源能够是传统型的数据库,kafka等。因为mapreduce默认只支持文本类型的数据源,因此sqoop2本身编写了InputFormat。oop
首先介绍一下mapreduce的InputFormat的相关知识。InputFormat表示一个类型的数据源,它负责切分数据,数据分片的元信息用InputSplit表示。而后使用RecordReader根据InputSplit,获取数据,提供给Mapper。ui
SqoopInputFormat是sqoop2自定义的InputFormat。首先看看它是如何定义数据切片的this
public class SqoopInputFormat extends InputFormat<SqoopSplit, NullWritable> { public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException { Configuration conf = context.getConfiguration(); // 获取Partitioner类 String partitionerName = conf.get(MRJobConstants.JOB_ETL_PARTITIONER); Partitioner partitioner = (Partitioner) ClassUtils.instantiate(partitionerName); // 读取partition相关的配置 PrefixContext connectorContext = new PrefixContext(conf, MRJobConstants.PREFIX_CONNECTOR_FROM_CONTEXT); Object connectorLinkConfig = MRConfigurationUtils.getConnectorLinkConfig(Direction.FROM, conf); Object connectorFromJobConfig = MRConfigurationUtils.getConnectorJobConfig(Direction.FROM, conf); Schema fromSchema = MRConfigurationUtils.getConnectorSchema(Direction.FROM, conf); long maxPartitions = conf.getLong(MRJobConstants.JOB_ETL_EXTRACTOR_NUM, 10); PartitionerContext partitionerContext = new PartitionerContext(connectorContext, maxPartitions, fromSchema); // 获取分区 List<Partition> partitions = partitioner.getPartitions(partitionerContext, connectorLinkConfig, connectorFromJobConfig); // 实例化InputSplit List<InputSplit> splits = new LinkedList<InputSplit>(); for (Partition partition : partitions) { LOG.debug("Partition: " + partition); SqoopSplit split = new SqoopSplit(); // 保存Partition split.setPartition(partition); splits.add(split); } if(splits.size() > maxPartitions) { throw new SqoopException(MRExecutionError.MAPRED_EXEC_0025, String.format("Got %d, max was %d", splits.size(), maxPartitions)); } return splits; }
数据分片的逻辑是由Partitioner负责的。这里以GenericJdbcPartitioner为例,它负责数据库的切分。线程
Partitioner会根据要切分字段的类型,实现不一样的切割方法。debug
public class GenericJdbcPartitioner extends Partitioner<LinkConfiguration, FromJobConfiguration> { // 根据int类型切割 protected List<Partition> partitionIntegerColumn() { List<Partition> partitions = new LinkedList<Partition>(); // 获取字段的最大值和最小值 long minValue = partitionMinValue == null ? Long.MIN_VALUE : Long.parseLong(partitionMinValue); long maxValue = Long.parseLong(partitionMaxValue); // 计算分片间隔 long interval = (maxValue - minValue) / numberPartitions; long remainder = (maxValue - minValue) % numberPartitions; // 当差值小于numberPartitions时 // 举例说, maxValue = 5, minValue = 1, numberPartitions为10。 // 那么numberPartitions最后计算出值为4 if (interval == 0) { numberPartitions = (int)remainder; } long lowerBound; long upperBound = minValue; // 注意变量 i,是从数字1开始的 for (int i = 1; i < numberPartitions; i++) { lowerBound = upperBound; upperBound = lowerBound + interval; // 将remainder的值,依次添加到前面的分区里 upperBound += (i <= remainder) ? 1 : 0; GenericJdbcPartition partition = new GenericJdbcPartition(); partition.setConditions( constructConditions(lowerBound, upperBound, false)); partitions.add(partition); } // 添加最后一个分区 GenericJdbcPartition partition = new GenericJdbcPartition(); partition.setConditions( constructConditions(upperBound, maxValue, true)); partitions.add(partition); return partitions; } // 构建where条件语句, 格式为 lowerBound <= column < upperBound,或者 lowerBound <= column <= upperBound protected String constructConditions( Object lowerBound, Object upperBound, boolean lastOne) { StringBuilder conditions = new StringBuilder(); conditions.append(lowerBound); conditions.append(" <= "); conditions.append(partitionColumnName); conditions.append(" AND "); conditions.append(partitionColumnName); conditions.append(lastOne ? " <= " : " < "); conditions.append(upperBound); return conditions.toString(); } }
首先计算出interval,remainder。 按照每段分区的个数都是interval,多的remainder轮询的添加到前面的四个分区。 最后一段分区包括maxValue。如下面为例
maxValue = 23 minValue = 1 numPartition = 5 interval = (23 -1) / 5 = 4 remainder = (23 -1) % 5 = 2
最后切割后的分区为
起始位置 , 结束位置 , 数目
1 , 6 , 5
6 , 11 , 5
11 , 15 , 4
15 , 19 , 4
19 , 23(包括) , 5
SqoopRecordReader的initialize方法会在Mapper以前调用,保存了split。它的getCurrentKey方法,返回split。
public class SqoopInputFormat extends InputFormat<SqoopSplit, NullWritable> { @Override public RecordReader<SqoopSplit, NullWritable> createRecordReader( InputSplit split, TaskAttemptContext context) { return new SqoopRecordReader(); } public static class SqoopRecordReader extends RecordReader<SqoopSplit, NullWritable> { private boolean delivered = false; private SqoopSplit split = null; @Override public boolean nextKeyValue() { if (delivered) { return false; } else { delivered = true; return true; } } @Override public SqoopSplit getCurrentKey() { return split; } @Override public NullWritable getCurrentValue() { return NullWritable.get(); } @Override public void close() { } @Override public float getProgress() { return delivered ? 1.0f : 0.0f; } @Override public void initialize(InputSplit split, TaskAttemptContext context) { this.split = (SqoopSplit)split; } } }
SqoopMapper实现了mapreduce的Mapper类,它实现了从SqoopSplit获取分区信息,而后抽取和转移数据。
public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, SqoopWritable, NullWritable> { //汇报进度线程 private final ScheduledExecutorService progressService = Executors.newSingleThreadScheduledExecutor(); private IntermediateDataFormat<Object> fromIDF = null; private IntermediateDataFormat<Object> toIDF = null; private Matcher matcher; @SuppressWarnings({ "unchecked", "rawtypes" }) @Override public void run(Context context) throws IOException, InterruptedException { Configuration conf = context.getConfiguration(); String extractorName = conf.get(MRJobConstants.JOB_ETL_EXTRACTOR); // 实例化Extractor类 Extractor extractor = (Extractor) ClassUtils.instantiate(extractorName); //获取链接信息 Schema fromSchema = MRConfigurationUtils.getConnectorSchema(Direction.FROM, conf); Schema toSchema = MRConfigurationUtils.getConnectorSchema(Direction.TO, conf); matcher = MatcherFactory.getMatcher(fromSchema, toSchema); String fromIDFClass = conf.get(MRJobConstants.FROM_INTERMEDIATE_DATA_FORMAT); fromIDF = (IntermediateDataFormat<Object>) ClassUtils.instantiate(fromIDFClass); fromIDF.setSchema(matcher.getFromSchema()); String toIDFClass = conf.get(MRJobConstants.TO_INTERMEDIATE_DATA_FORMAT); toIDF = (IntermediateDataFormat<Object>) ClassUtils.instantiate(toIDFClass); toIDF.setSchema(matcher.getToSchema()); PrefixContext subContext = new PrefixContext(conf, MRJobConstants.PREFIX_CONNECTOR_FROM_CONTEXT); Object fromConfig = MRConfigurationUtils.getConnectorLinkConfig(Direction.FROM, conf); Object fromJob = MRConfigurationUtils.getConnectorJobConfig(Direction.FROM, conf); // 获取对应的SqoopSplit,里面包含了Partition信息 SqoopSplit split = context.getCurrentKey(); ExtractorContext extractorContext = new ExtractorContext(subContext, new SqoopMapDataWriter(context), fromSchema); try { LOG.info("Starting progress service"); // 每2m汇报一次进度 progressService.scheduleAtFixedRate(new SqoopProgressRunnable(context), 0, 2, TimeUnit.MINUTES); LOG.info("Running extractor class " + extractorName); // 抽取而且转换数据 extractor.extract(extractorContext, fromConfig, fromJob, split.getPartition()); LOG.info("Extractor has finished"); // 更新已读行数 context.getCounter(SqoopCounters.ROWS_READ).increment(extractor.getRowsRead()); } catch (Exception e) { throw new SqoopException(MRExecutionError.MAPRED_EXEC_0017, e); } finally { LOG.info("Stopping progress service"); progressService.shutdown(); if (!progressService.awaitTermination(5, TimeUnit.SECONDS)) { LOG.info("Stopping progress service with shutdownNow"); progressService.shutdownNow(); } } } } public class SqoopProgressRunnable implements Runnable { public static final Logger LOG = Logger.getLogger(SqoopProgressRunnable.class); private final TaskInputOutputContext<?,?,?,?> context; public SqoopProgressRunnable(final TaskInputOutputContext<?,?,?,?> ctx) { this.context = ctx; } @Override public void run() { // 执行汇报progress LOG.debug("Auto-progress thread reporting progress"); this.context.progress(); } }
sqoop2为了扩展mapreduce的数据源,本身实现了InputFormat类,其中包括了SqoopSplit,SqoopInputFormat, Partitioner。SqoopMapper则从SqoopSplit获取Partition信息,进行数据转移。