Flink源码分析之深度解读流式数据写入hive

  • 前言java

  • 数据流处理linux

  • hive基本信息获取sql

  • 流、批判断数据库

  • 写入格式判断apache

  • 构造分区提交算子微信

  • 详解StreamingFileWriter并发

  • 简述StreamingFileSinkapp

  • 分区信息提交ide

  • 提交分区算子oop

  • 分区提交触发器

  • 分区提交策略

  • 总结

前言

前段时间咱们讲解了flink1.11中如何将流式数据写入文件系统和hive [flink 1.11 使用sql将流式数据写入hive],今天咱们来从源码的角度深刻分析一下。以便朋友们对flink流式数据写入hive有一个深刻的了解,以及在出现问题的时候知道该怎么调试。

其实咱们能够想一下这个工做大概是什么流程,首先要写入hive,咱们首先要从hive的元数据里拿到相关的hive表的信息,好比存储的路径是哪里,以便往那个目录写数据,还有存储的格式是什么,orc仍是parquet,这样咱们须要调用对应的实现类来进行写入,其次这个表是不是分区表,写入数据是动态分区仍是静态分区,这些都会根据场景的不一样而选择不一样的写入策略。

写入数据的时候确定不会把全部数据写入一个文件,那么文件的滚动策略是什么呢?写完了数据咱们如何更新hive的元数据信息,以便咱们能够及时读取到相应的数据呢?

我画了一个简单的流程图,你们能够先看下,接下来咱们带着这些疑问,一步步的从源码里探索这些功能是如何实现的。

数据流处理

咱们此次主要是分析flink如何将相似kafka的流式数据写入到hive表,咱们先来一段简单的代码:

//构造hive catalog		String name = "myhive";		String defaultDatabase = "default";		String hiveConfDir = "/Users/user/work/hive/conf"; // a local path		String version = "3.1.2";		HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version);		tEnv.registerCatalog("myhive", hive);		tEnv.useCatalog("myhive");		tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);		tEnv.useDatabase("db1");		tEnv.createTemporaryView("kafka_source_table", dataStream);		String insertSql = "insert into  hive.db1.fs_table SELECT userId, amount, " +		                   " DATE_FORMAT(ts, 'yyyy-MM-dd'), DATE_FORMAT(ts, 'HH'), DATE_FORMAT(ts, 'mm') FROM kafka_source_table";		tEnv.executeSql(insertSql);

系统在启动的时候会首先解析sql,获取相应的属性,而后会经过java的SPI机制加载TableFactory的全部子类,包含TableSourceFactory和TableSinkFactory,以后,会根据从sql中解析的属性循环判断使用哪一个工厂类,具体的操做是在TableFactoryUtil类的方法里面实现的。

好比对于上面的sql,解析以后,发现是要写入一个表名为hive.db1.fs_table的hive sink。因此系统在调用TableFactoryUtil#findAndCreateTableSink(TableSinkFactory.Context context)方法之后,获得了TableSinkFactory的子类HiveTableFactory,而后调用相应的createTableSink方法来建立相应的sink,也就是HiveTableSink。

咱们来简单看下HiveTableSink的变量和结构。

/** * Table sink to write to Hive tables. */public class HiveTableSink implements AppendStreamTableSink, PartitionableTableSink, OverwritableTableSink {	private static final Logger LOG = LoggerFactory.getLogger(HiveTableSink.class);	private final boolean userMrWriter;	//是否有界,用来区分是批处理仍是流处理	private final boolean isBounded;	private final JobConf jobConf;	private final CatalogTable catalogTable;	private final ObjectIdentifier identifier;	private final TableSchema tableSchema;	private final String hiveVersion;	private final HiveShim hiveShim;	private LinkedHashMap<String, String> staticPartitionSpec = new LinkedHashMap<>();	private boolean overwrite = false;	private boolean dynamicGrouping = false;

咱们看到它实现了AppendStreamTableSink, PartitionableTableSink, OverwritableTableSink三个接口,这三个接口决定了hive sink实现的功能,数据只能是append模式的,数据是可分区的、而且数据是能够被覆盖写的。

类里面的这些变量,看名字就大概知道是什么意思了,就不作解释了,讲一下HiveShim,咱们在构造方法里看到hiveShim是和hive 的版本有关的,因此其实这个类咱们能够理解为对不一样hive版本操做的一层封装。

hiveShim = HiveShimLoader.loadHiveShim(hiveVersion);

tablesink处理数据流的方法是consumeDataStream,咱们来重点分析下。

hive基本信息获取

首先会经过hive的配置链接到hive的元数据库,获得hive表的基本信息。

String[] partitionColumns = getPartitionKeys().toArray(new String[0]);		String dbName = identifier.getDatabaseName();		String tableName = identifier.getObjectName();		try (HiveMetastoreClientWrapper client = HiveMetastoreClientFactory.create(				new HiveConf(jobConf, HiveConf.class), hiveVersion)) {			Table table = client.getTable(dbName, tableName);			StorageDescriptor sd = table.getSd();
  • 获取到hive的表的信息,也就是Table对象。

  • 获取表的一些存储信息,StorageDescriptor对象,这里面包含了hive表的存储路径、存储格式等等。

流、批判断

接下来判断写入hive是批处理仍是流处理

if (isBounded){   ......   //batch    } else {   ......   //streaming    }

因为此次咱们主要分析flink的流处理,因此对于batch就暂且跳过,进入else,也就是流处理。

在这里,定义了一些基本的配置:

  • 桶分配器TableBucketAssigner,简单来讲就是如何肯定数据的分区,好比按时间,仍是按照字段的值等等。

  • 滚动策略,如何生成下一个文件,按照时间,仍是文件的大小等等。

  • 构造bulkFactory,目前只有parquet和orc的列存储格式使用bulkFactory

//桶分配器	TableBucketAssigner assigner = new TableBucketAssigner(partComputer);		//滚动策略	TableRollingPolicy rollingPolicy = new TableRollingPolicy(						true,						conf.get(SINK_ROLLING_POLICY_FILE_SIZE).getBytes(),						conf.get(SINK_ROLLING_POLICY_ROLLOVER_INTERVAL).toMillis());    //构造bulkFactory	Optional<BulkWriter.Factory<RowData>> bulkFactory = createBulkWriterFactory(partitionColumns, sd);

createBulkWriterFactory方法主要是用于构造写入列存储格式的工厂类,目前只支持parquet和orc格式,首先定义用于构造工厂类的一些参数,好比字段的类型,名称等等,以后根据不一样类型构造不一样的工厂类。若是是parquet格式,最终构造的是ParquetWriterFactory工厂类,若是是orc格式,根据hive的版本不一样,分别构造出OrcBulkWriterFactory或者是OrcNoHiveBulkWriterFactory。

写入格式判断

若是是使用MR的writer或者是行格式,进入if逻辑,使用HadoopPathBasedBulkFormatBuilder,若是是列存储格式,进入else逻辑,使用StreamingFileSink来写入数据.

if (userMrWriter || !bulkFactory.isPresent()) {					HiveBulkWriterFactory hadoopBulkFactory = new HiveBulkWriterFactory(recordWriterFactory);					builder = new HadoopPathBasedBulkFormatBuilder<>(							new Path(sd.getLocation()), hadoopBulkFactory, jobConf, assigner)							.withRollingPolicy(rollingPolicy)							.withOutputFileConfig(outputFileConfig);					LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer.");				} else {					builder = StreamingFileSink.forBulkFormat(							new org.apache.flink.core.fs.Path(sd.getLocation()),							new FileSystemTableSink.ProjectionBulkFactory(bulkFactory.get(), partComputer))							.withBucketAssigner(assigner)							.withRollingPolicy(rollingPolicy)							.withOutputFileConfig(outputFileConfig);					LOG.info("Hive streaming sink: Use native parquet&orc writer.");				}

在大数据处理中,列式存储比行存储有着更好的查询效率,因此咱们此次以列式存储为主,聊聊StreamingFileSink是如何写入列式数据的。经过代码咱们看到在构造buckets builder的时候,使用了前面刚生成的bucket assigner、输出的配置、以及文件滚动的策略。

构造分区提交算子

在HiveTableSink#consumeDataStream方法的最后,进入了FileSystemTableSink#createStreamingSink方法,这个方法主要作了两件事情,一个是建立了用于流写入的算子StreamingFileWriter,另外一个是当存在分区列而且在配置文件配置了分区文件提交策略的时候,构造了一个用于提交分区文件的算子StreamingFileCommitter,这个算子固定的只有一个并发度。

StreamingFileWriter fileWriter = new StreamingFileWriter(				rollingCheckInterval,				bucketsBuilder);		DataStream<CommitMessage> writerStream = inputStream.transform(				StreamingFileWriter.class.getSimpleName(),				TypeExtractor.createTypeInfo(CommitMessage.class),				fileWriter).setParallelism(inputStream.getParallelism());		DataStream<?> returnStream = writerStream;		// save committer when we don't need it.		if (partitionKeys.size() > 0 && conf.contains(SINK_PARTITION_COMMIT_POLICY_KIND)) {			StreamingFileCommitter committer = new StreamingFileCommitter(					path, tableIdentifier, partitionKeys, msFactory, fsFactory, conf);			returnStream = writerStream					.transform(StreamingFileCommitter.class.getSimpleName(), Types.VOID, committer)					.setParallelism(1)					.setMaxParallelism(1);		}

咱们看到在代码中,inputStream通过transform方法,最终将要提交的数据转换成CommitMessage格式,而后发送给它的下游StreamingFileCommitter算子,也就是说StreamingFileCommitter将会接收StreamingFileWriter中收集的数据。

详解StreamingFileWriter

这个StreamingFileWriter咱们能够理解为一个算子级别的写入文件的sink,它对StreamingFileSink进行了一些包装,而后添加了一些其余操做,好比提交分区信息等等。咱们简单看下这个类的结构,并简单聊聊各个方法的做用。

public class StreamingFileWriter extends AbstractStreamOperator<CommitMessage>		implements OneInputStreamOperator<RowData, CommitMessage>, BoundedOneInput{			@Override	public void initializeState(StateInitializationContext context) throws Exception {	    .........................	}				@Override	public void snapshotState(StateSnapshotContext context) throws Exception {	  .........................	}	@Override	public void processWatermark(Watermark mark) throws Exception {	  .........................	}	@Override	public void processElement(StreamRecord<RowData> element) throws Exception {	  .........................	}	/**	 * Commit up to this checkpoint id, also send inactive partitions to downstream for committing.	 */	@Override	public void notifyCheckpointComplete(long checkpointId) throws Exception {		  .........................	}	    		@Override	public void endInput() throws Exception {		  .........................	}	@Override	public void dispose() throws Exception {	  .........................	}	    		    		   }
  • initializeState :初始化状态的方法,在这里构造了要写入文件的buckets,以及具体写入文件的StreamingFileSinkHelper等等。

  • snapshotState:这个方法主要是进行每次checkpoint的时候调用。

  • processWatermark这个方法经过名字就能看出来,是处理水印的,好比往下游发送水印等等。

  • processElement:处理元素最核心的方法,每来一条数据,都会进入这个方法进行处理。

  • notifyCheckpointComplete,每次checkpoint完成的时候调用该方法。在这里,收集了一些要提交的分区的信息,用于分区提交。

  • endInput:再也不有更多的数据进来,也就是输入结束的时候调用。

  • dispose:算子的生命周期结束的时候调用。

简述StreamingFileSink

StreamingFileSink咱们来简单的描述下,经过名字咱们就能看出来,这是一个用于将流式数据写入文件系统的sink,它集成了checkpoint提供exactly once语义。

在StreamingFileSink里有一个bucket的概念,咱们能够理解为数据写入的目录,每一个bucket下能够写入多个文件。它提供了一个BucketAssigner的概念用于生成bucket,进来的每个数据在写入的时候都会判断下要写入哪一个bucket,默认的实现是DateTimeBucketAssigner,每小时生成一个bucket。

它根据不一样的写入格式分别使用StreamingFileSink#forRowFormat或者StreamingFileSink#forBulkFormat来进行相应的处理。

此外,该sink还提供了一个RollingPolicy用于决定数据的滚动策略,好比文件到达多大或者通过多久就关闭当前文件,开启下一个新文件。

具体的写入ORC格式的数据,能够参考下这个文章:flink 1.11 流式数据ORC格式写入file,因为咱们此次主要是讲总体写入hive的流程,这个sink就不作太具体的讲解了。

分区信息提交

StreamingFileWriter#notifyCheckpointComplete 调用commitUpToCheckpoint在checkpoint完成的时候触发了分区的提交操做。

private void commitUpToCheckpoint(long checkpointId) throws Exception {		helper.commitUpToCheckpoint(checkpointId);		CommitMessage message = new CommitMessage(				checkpointId,				getRuntimeContext().getIndexOfThisSubtask(),				getRuntimeContext().getNumberOfParallelSubtasks(),				new ArrayList<>(inactivePartitions));		output.collect(new StreamRecord<>(message));		inactivePartitions.clear();	}

在这里,咱们看到,使用inactivePartitions构造了CommitMessage对象,而后使用output.collect将这个提交数据收集起来,也就是上文咱们提到的这里收集到的这个数据将会发给StreamingFileCommitter算子来处理。

而inactivePartitions里面的数据是何时添加进来的呢,也就是何时才会生成要提交的分区呢?咱们跟踪一下代码,发现是给写入文件的buckets添加了一个监听器,在bucket成为非活跃状态以后,触发监听器,而后将对应的bucket id 添加到inactivePartitions集合。

@Override	public void initializeState(StateInitializationContext context) throws Exception {        ..........................		buckets.setBucketLifeCycleListener(new BucketLifeCycleListener<RowData, String>() {			@Override			public void bucketCreated(Bucket<RowData, String> bucket) {			}			@Override			public void bucketInactive(Bucket<RowData, String> bucket) {				inactivePartitions.add(bucket.getBucketId());			}		});	}

而通知bucket变为非活动状态又是什么状况会触发呢?从代码注释咱们看到,到目前为止该bucket已接收的全部记录都已提交后,则该bucket将变为非活动状态。

提交分区算子

这是一个单并行度的算子,用于提交写入文件系统的分区信息。具体的处理步骤以下:

  • 从上游收集要提交的分区信息

  • 判断某一个checkpoint下,全部的子任务是否都已经接收了分区的数据

  • 获取分区提交触发器。(目前支持partition-time和process-time)

  • 使用分区提交策略去依次提交分区信息(能够配置多个分区策略)

这里咱们主要讲一下 StreamingFileCommitter#processElement方法是如何对进来的每一个提交数据进行处理的。

@Override	public void processElement(StreamRecord<CommitMessage> element) throws Exception {		CommitMessage message = element.getValue();		for (String partition : message.partitions) {			trigger.addPartition(partition);		}		if (taskTracker == null) {			taskTracker = new TaskTracker(message.numberOfTasks);		}		boolean needCommit = taskTracker.add(message.checkpointId, message.taskId);		if (needCommit) {			commitPartitions(message.checkpointId);		}	}

咱们看到,从上游接收到CommitMessage元素,而后从里面获得要提交的分区,添加到PartitionCommitTrigger里(变量trigger),而后经过taskTracker来判断一下,该checkpoint每一个子任务是否已经接收到了分区数据,最后经过commitPartitions方法来提交分区信息。

进入commitPartitions方法,看看是如何提交分区的。

private void commitPartitions(long checkpointId) throws Exception {		List<String> partitions = checkpointId == Long.MAX_VALUE ?				trigger.endInput() :				trigger.committablePartitions(checkpointId);		if (partitions.isEmpty()) {			return;		}		try (TableMetaStoreFactory.TableMetaStore metaStore = metaStoreFactory.createTableMetaStore()) {			for (String partition : partitions) {				LinkedHashMap<String, String> partSpec = extractPartitionSpecFromPath(new Path(partition));				LOG.info("Partition {} of table {} is ready to be committed", partSpec, tableIdentifier);				Path path = new Path(locationPath, generatePartitionPath(partSpec));				PartitionCommitPolicy.Context context = new PolicyContext(						new ArrayList<>(partSpec.values()), path);				for (PartitionCommitPolicy policy : policies) {					if (policy instanceof MetastoreCommitPolicy) {						((MetastoreCommitPolicy) policy).setMetastore(metaStore);					}					policy.commit(context);				}			}		}	}

从trigger中获取该checkpoint下的全部要提交的分区,放到一个List集合partitions中,在提交的分区不为空的状况下,循环遍历要配置的分区提交策略PartitionCommitPolicy,而后提交分区。

分区提交触发器

目前系统提供了两种分区提交的触发器,PartitionTimeCommitTigger和ProcTimeCommitTigger,分别用于处理何时提交分区。

  • ProcTimeCommitTigger 主要依赖于分区的建立时间和delay,当处理时间大于'partition creation time' + 'delay'的时候,将提交这个分区

  • PartitionTimeCommitTigger 依赖于水印,当水印的值大于 partition-time + delay的时候提交这个分区。

分区提交策略

目前系统提供了一个接口PartitionCommitPolicy,用于提交分区的信息,目前系统提供了如下几种方案,

  • 一种是METASTORE,主要是用于提交hive的分区,好比建立hive分区等等

  • 还有一种是SUCCESS_FILE,也就是往对应的分区目录下写一个success文件。

  • 此外,系统还提供了一个对外的自定义实现,用于用户自定义分区提交,好比提交分区以后合并小文件等等。自定义提交策略的时候,须要实现PartitionCommitPolicy接口,并将提交策略置为custom。

我在网上也看到过一些实现该接口用于合并小文件的示例,可是我我的以为其实有点不太完美,由于这个合并小文件可能会涉及不少的问题:

  • 合并的时候如何保证事务,保证合并的同时如何有读操做不会发生脏读

  • 事务的一致性,若是合并出错了怎么回滚

  • 合并小文件的性能是否跟得上,目前flink只提供了一个单并行度的提交算子。

  • 如何多并发合并写入

因此暂时我也没有想到一个完美的方案用于flink来合并小文件。

总结

经过上述的描述,咱们简单聊了一下flink是如何将流式数据写入hive的,可是可能每一个人在作的过程当中仍是会遇到各类各类的环境问题致使的写入失败,好比window和linux系统的差别,hdfs版本的差别,系统时区的配置等等,在遇到一些个性化的问题以后,就可能须要你们去针对本身的问题去个性化的debug了。

更多干货信息,欢迎关注个人公众号【大数据技术与应用实战】

本文分享自微信公众号 - 大数据技术与应用实战(bigdata_bigdata)。
若有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一块儿分享。

相关文章
相关标签/搜索