【摘要】Sqoop是一种用于在Apache Hadoop和结构化数据存储(如关系数据库)之间高效传输批量数据的工具 。本文将简单介绍Sqoop做业执行时相关的类及方法,并将该过程与MapReduce的执行结合,分析数据如何从源端迁移到目的端。sql
抛开MR的执行过程,Sqoop执行时用到的关键类总共有5个,Initializer、Partitioner、Extractor、Loader、Destroyer。执行流程以下图所示数据库
所以,每次新建一个链接器都要实现上述5个类。apache
Initializer是在sqoop任务提交到MR以前被调用,主要是作迁移前的准备,例如链接数据源,建立临时表,添加依赖的jar包等。它是sqoop做业生命周期的第一步,主要API以下segmentfault
public abstract void initialize(InitializerContext context, LinkConfiguration linkConfiguration,JobConfiguration jobConfiguration);并发
public List<String> getJars(InitializerContext context, LinkConfiguration linkConfiguration,JobConfiguration jobConfiguration){app
return new LinkedList<String>();
}工具
public abstract Schema getSchema(InitializerContext context, LinkConfiguration linkConfiguration,JobConfiguration jobConfiguration) {oop
return new NullSchema();
}spa
其中getSchema()方法被From或者To端的connector在提取或者载入数据时用来匹配数据。例如,一个GenericJdbcConnector会调用它获取源端Mysql的数据库名,表名,表中的字段信息等。线程
Destroyer 是在做业执行结束后被实例化,这是Sqoop做业的最后一步。清理任务,删除临时表,关闭链接器等。
public abstract void destroy(DestroyerContext context,
LinkConfiguration linkConfiguration,JobConfiguration jobConfiguration);
Partitioner建立分区Partition,Sqoop默认建立10个分片,主要API以下
public abstract List<Partition> getPartitions(PartitionerContext context,
LinkConfiguration linkConfiguration, FromJobConfiguration jobConfiguration);
Partition类中实现了readFields()方法和write()方法,方便读写
public abstract class Partition {
public abstract void readFields(DataInput in) throws IOException;
public abstract void write(DataOutput out) throws IOException;
public abstract String toString();
}
Extractor类根据分片partition和配置信息从源端提取数据,写入SqoopMapDataWriter中,SqoopMapDataWriter是SqoopMapper的内部类它继承了DataWriter类。此外它打包了SqoopWritable类,以中间数据格式保存从源端读取到的数据。
public abstract void extract(ExtractorContext context,
LinkConfiguration linkConfiguration, JobConfiguration jobConfiguration, SqoopPartition partition);
该方法内部核心代码以下
while (resultSet.next()) {
...
context.getDataWriter().writeArrayRecord(array);
...
}
loader从源端接受数据,并将其载入目的端,它必须实现以下接口
public abstract void load(LoaderContext context,
ConnectionConfiguration connectionConfiguration, JobConfiguration jobConfiguration) throws Exception;
load方法从SqoopOutputFormatDataReader中读取,它读取“中间数据格式表示形式” _中的数据并将其加载到数据源。此外Loader必须迭代的调用DataReader()直到它读完。
while ((array = context.getDataReader().readArrayRecord()) != null) {
...
}
上一节避开MR执行过程,仅仅从Extractor和Loader过程描述迁移过程。下面将结合MR的执行过程详细的介绍一个Sqoop迁移做业流程。
初始化
1)做业初始化阶段,SqoopInputFormat读取给源端数据分片的过程
这里每一个Partition分片会交给一个Mapper执行。每一个Mapper分别启动一个extractor线程和Loader线程迁移数据。
Mapper
2)做业执行阶段的Mapper过程
private Class SqoopMapDataWriter extends DataWriter {
... private void writeContent() { ... context.wirte(writable, NullWritable.get()); // 这里的writable 是SqoopWritable的一个对象 ... } ...
}
注意:这里的Context中存的是KV对,K是SqoopWritable,而V仅是一个空的Writable对象。SqoopWritable中实现了write和readField,用于序列化和反序列化。
Reducer
3)做业执行阶段的Reduce过程,
public RecordWriter<SqoopWritable, NullWritable> getRecordWriter() {
executorService = Executors.newSingleThreadExecutor(...); consumerFuture = executorService.submit(new ConsumerThread(context)); return writer;
}
private class ConsumerThread implements Runnable {
... public void run() { ... Loader.load(loaderContext, connectorLinkConfig, ConnectorToJobConfig); ... } ...
}
注意:
以上即为Sqoop做业执行时相关的类及方法内容,但愿对你们在进行数据迁移过程当中有所帮助。