Flink 之 Data Sink

首先 Sink 的中文释义为:java

下沉; 下陷; 沉没; 使下沉; 使沉没; 倒下; 坐下;

 因此,对应 Data sink 意思有点把数据存储下来(落库)的意思;ide

 

Source  数据源  ---- > Compute  计算 -----> sink 落库

如上图,Source 就是数据的来源,中间的 Compute 其实就是 Flink 干的事情,能够作一系列的操做,操做完后就把计算后的数据结果 Sink 到某个地方。(能够是 MySQL、ElasticSearch、Kafka、Cassandra 等)。this

这里我说下本身目前作告警这块就是把 Compute 计算后的结果 Sink 直接告警出来了(发送告警消息到钉钉群、邮件、短信等),这个 sink 的意思也不必定非得说成要把数据存储到某个地方去。spa

其实官网用的 Connector 来形容要去的地方更合适,这个 Connector 能够有 MySQL、ElasticSearch、Kafka、Cassandra RabbitMQ 等。3d

 

Data Source 介绍了 Flink Data Source 有哪些,这里也看看 Flink Data Sink 支持的有哪些:code

 

 

 

看下源码有哪些呢?orm

能够看到有 Kafka、ElasticSearch、Socket、RabbitMQ、JDBC、Cassandra POJO、File、Print 等 Sink 的方式。blog

 

 

从上图能够看到 SinkFunction 接口有 invoke 方法,它有一个 RichSinkFunction 抽象类。继承

上面的那些自带的 Sink 能够看到都是继承了 RichSinkFunction 抽象类,实现了其中的方法,那么咱们要是本身定义本身的 Sink 的话其实也是要按照这个套路来作的。接口

这里就拿个较为简单的 PrintSinkFunction 源码来说下:

@PublicEvolving
public class PrintSinkFunction<IN> extends RichSinkFunction<IN> {
	private static final long serialVersionUID = 1L;

	private static final boolean STD_OUT = false;
	private static final boolean STD_ERR = true;

	private boolean target;
	private transient PrintStream stream;
	private transient String prefix;

	/**
	 * Instantiates a print sink function that prints to standard out.
	 */
	public PrintSinkFunction() {}

	/**
	 * Instantiates a print sink function that prints to standard out.
	 *
	 * @param stdErr True, if the format should print to standard error instead of standard out.
	 */
	public PrintSinkFunction(boolean stdErr) {
		target = stdErr;
	}

	public void setTargetToStandardOut() {
		target = STD_OUT;
	}

	public void setTargetToStandardErr() {
		target = STD_ERR;
	}

	@Override
	public void open(Configuration parameters) throws Exception {
		super.open(parameters);
		StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
		// get the target stream
		stream = target == STD_OUT ? System.out : System.err;

		// set the prefix if we have a >1 parallelism
		prefix = (context.getNumberOfParallelSubtasks() > 1) ?
				((context.getIndexOfThisSubtask() + 1) + "> ") : null;
	}

	@Override
	public void invoke(IN record) {
		if (prefix != null) {
			stream.println(prefix + record.toString());
		}
		else {
			stream.println(record.toString());
		}
	}

	@Override
	public void close() {
		this.stream = null;
		this.prefix = null;
	}

	@Override
	public String toString() {
		return "Print to " + (target == STD_OUT ? "System.out" : "System.err");
	}
}

  

能够看到它就是实现了 RichSinkFunction 抽象类,而后实现了 invoke 方法,这里 invoke 方法就是把记录打印出来了就是,没作其余的额外操做。

如何使用?

SingleOutputStreamOperator.addSink(new PrintSinkFunction<>();

  

这样就能够了,若是是其余的 Sink Function 的话须要换成对应的。

使用这个 Function 其效果就是打印从 Source 过来的数据,和直接 Source.print() 效果同样。

 

 

 

下篇文章咱们将讲解下如何自定义本身的 Sink Function,并使用一个 demo 来教你们,让你们知道这个套路,且可以在本身工做中自定义本身须要的 Sink Function,来完成本身的工做需求。

最后

本文主要讲了下 Flink 的 Data Sink,并介绍了常见的 Data Sink,也看了下源码的 SinkFunction,介绍了一个简单的 Function 使用, 告诉了你们自定义 Sink Function 的套路,下篇文章带你们写个。

 

原创地址为:http://www.54tianzhisheng.cn/2018/10/29/flink-sink/

相关文章
相关标签/搜索