sqoop2从数据源读取数据,而后写入到目的地。sqoop2数据的导入是基于mapreduce的框架,因此sqoop2本身实现了OutputFormat类,支持将结果导入hive,kafka,数据库等类型。java
public class SqoopNullOutputFormat extends OutputFormat<SqoopWritable, NullWritable> { @Override public RecordWriter<SqoopWritable, NullWritable> getRecordWriter(TaskAttemptContext context) { SqoopOutputFormatLoadExecutor executor = new SqoopOutputFormatLoadExecutor(context); return executor.getRecordWriter(); } }
SqoopNullOutputFormat继承了OutputFormat,它调用了SqoopOutputFormatLoadExecutor的getRecordWriter方法,返回RecordWriter。sql
toDataFormat存储要处理的数据。这里涉及到多线程的读写竞争,因此用filled和free两个信号量来保证。数据库
public class SqoopOutputFormatLoadExecutor { private volatile boolean readerFinished = false; private volatile boolean writerFinished = false; private volatile IntermediateDataFormat<? extends Object> toDataFormat; private Semaphore filled = new Semaphore(0, true); // 初始值为0,必须先release private Semaphore free = new Semaphore(1, true); // 初始值为1,先acquire public RecordWriter<SqoopWritable, NullWritable> getRecordWriter() { // 实例化ConsumerThread线程,处理toDataFormat中的数据 consumerFuture = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat ("OutputFormatLoader-consumer").build()).submit( new ConsumerThread(context)); return writer; }
SqoopRecordWriter负责toDataFormat的写,SqoopOutputFormatDataReader负责toDataFormat的读。多线程
private class SqoopRecordWriter extends RecordWriter<SqoopWritable, NullWritable> { @Override public void write(SqoopWritable key, NullWritable value) throws InterruptedException { // 获取free锁,等待读操做完成 free.acquire(); checkIfConsumerThrew(); toDataFormat.setCSVTextData(key.toString()); // 释放filled锁,通知写操做完成 filled.release(); } } private class SqoopOutputFormatDataReader extends DataReader { @Override public Object[] readArrayRecord() throws InterruptedException { // 获取filled锁,等待写操做完成 acquireSema(); // 若是全部数据已经写完了,则返回null if (writerFinished) { return null; } try { return toDataFormat.getObjectData(); } finally { // 释放free锁,通知读操做完成 releaseSema(); } } private void acquireSema() throws InterruptedException { try { filled.acquire(); } catch (InterruptedException ex) { throw ex; } } private void releaseSema(){ free.release(); } }
经过filled和free两个信号量,实现了写读的轮询操做。由于filled的初始值为0,因此是写操做在前面。框架
ConsumerThread负责从toDataFormat读取数据,而后发送到数据目的地。ide
private class ConsumerThread implements Runnable { private final JobContext jobctx; public ConsumerThread(final JobContext context) { jobctx = context; } @SuppressWarnings({ "rawtypes", "unchecked" }) @Override public void run() { LOG.info("SqoopOutputFormatLoadExecutor consumer thread is starting"); try { // 实例化DataReader, 用来读取toDataFormat的数据 DataReader reader = new SqoopOutputFormatDataReader(); Configuration conf = context.getConfiguration(); // 实例化Loader,负责数据的处理 Loader loader = (Loader) ClassUtils.instantiate(loaderName); PrefixContext subContext = new PrefixContext(conf, MRJobConstants.PREFIX_CONNECTOR_TO_CONTEXT); Object connectorLinkConfig = MRConfigurationUtils .getConnectorLinkConfig(Direction.TO, conf); Object connectorToJobConfig = MRConfigurationUtils .getConnectorJobConfig(Direction.TO, conf); LoaderContext loaderContext = new LoaderContext(subContext, reader, matcher.getToSchema()); LOG.info("Running loader class " + loaderName); // 负责将数据,存储到目的地 loader.load(loaderContext, connectorLinkConfig, connectorToJobConfig); LOG.info("Loader has finished"); ((TaskAttemptContext) jobctx).getCounter(SqoopCounters.ROWS_WRITTEN).increment( loader.getRowsWritten()); } catch (Throwable t) { // 若是处理数据失败,会将readerFinished设置为true // 释放free锁,防止另外一个线程一直等待 readerFinished = true; LOG.error("Error while loading data out of MR job.", t); free.release(); throw new SqoopException(MRExecutionError.MAPRED_EXEC_0018, t); } if (!writerFinished) { // 若是读操做中止,在写操做以前。说明还有数据是没有被处理的 readerFinished = true; LOG.error("Reader terminated, but writer is still running!"); free.release(); throw new SqoopException(MRExecutionError.MAPRED_EXEC_0019); } readerFinished = true; } }
Loader有多个子类,支持不一样的数据源。好比GenericJdbcLoader支持导入到sql数据库,KafkaLoader支持导入到kafka中等等。下面以GenericJdbcLoader为例,oop
public class GenericJdbcLoader extends Loader<LinkConfiguration, ToJobConfiguration> { public static final int DEFAULT_ROWS_PER_BATCH = 100; public static final int DEFAULT_BATCHES_PER_TRANSACTION = 100; private int rowsPerBatch = DEFAULT_ROWS_PER_BATCH; private int batchesPerTransaction = DEFAULT_BATCHES_PER_TRANSACTION; private long rowsWritten = 0; @Override public void load(LoaderContext context, LinkConfiguration linkConfig, ToJobConfiguration toJobConfig) throws Exception{ String driver = linkConfig.linkConfig.jdbcDriver; String url = linkConfig.linkConfig.connectionString; String username = linkConfig.linkConfig.username; String password = linkConfig.linkConfig.password; // 实例化JdbcExecutor,用来执行sql语句 GenericJdbcExecutor executor = new GenericJdbcExecutor(driver, url, username, password); executor.setAutoCommit(false); String sql = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_TO_DATA_SQL); // 开始批处理,写入数据 executor.beginBatch(sql); try { int numberOfRowsPerBatch = 0; int numberOfBatchesPerTransaction = 0; Object[] array; // 从dataReader循环获取数据。直到写操做中止。 while ((array = context.getDataReader().readArrayRecord()) != null) { numberOfRowsPerBatch++; executor.addBatch(array, context.getSchema()); // 每次批处理为100条数据。每次提交100个批处理。 if (numberOfRowsPerBatch == rowsPerBatch) { numberOfBatchesPerTransaction++; if (numberOfBatchesPerTransaction == batchesPerTransaction) { executor.executeBatch(true); numberOfBatchesPerTransaction = 0; } else { executor.executeBatch(false); } numberOfRowsPerBatch = 0; } rowsWritten ++; } if (numberOfRowsPerBatch != 0 || numberOfBatchesPerTransaction != 0) { // 提交剩下的批处理 executor.executeBatch(true); } executor.endBatch(); } finally { executor.close(); } } @Override public long getRowsWritten() { return rowsWritten; } }
private class SqoopRecordWriter extends RecordWriter<SqoopWritable, NullWritable> { @Override public void close(TaskAttemptContext context) throws InterruptedException, IOException { // 获取free锁,等待读操做完成 free.acquire(); //设置 writerFinished为true writerFinished = true; // 释放filled,通知读线程 filled.release(); // 等待Consume线程完成 waitForConsumer(); LOG.info("SqoopOutputFormatLoadExecutor::SqoopRecordWriter is closed"); } } private void waitForConsumer() { try { // 获取future的值 consumerFuture.get(); } catch (ExecutionException ex) { Throwable t = ex.getCause(); if (t instanceof SqoopException) { throw (SqoopException) t; } Throwables.propagate(t); } catch (Exception ex) { throw new SqoopException(MRExecutionError.MAPRED_EXEC_0019, ex); } }
sqoop2本身实现了OutputFormat,以支持mapreduce的自定义输出。ui
OutputFormat采用了多线程的框架。一个线程负责读取mapreduce的输出,一个线程负责对读取的数据,进行处理,存储到目的地。url