前篇文章 《从0到1学习Flink》—— Data Sink 介绍 介绍了 Flink Data Sink,也介绍了 Flink 自带的 Sink,那么如何自定义本身的 Sink 呢?这篇文章将写一个 demo 教你们将从 Kafka Source 的数据 Sink 到 MySQL 中去。java
咱们先来看下 Flink 从 Kafka topic 中获取数据的 demo,首先你须要安装好了 FLink 和 Kafka 。mysql
运行启动 Flink、Zookepeer、Kafka,git
好了,都启动了!github
DROP TABLE IF EXISTS `student`;
CREATE TABLE `student` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
`name` varchar(25) COLLATE utf8_bin DEFAULT NULL,
`password` varchar(25) COLLATE utf8_bin DEFAULT NULL,
`age` int(10) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8 COLLATE=utf8_bin;复制代码
Student.javasql
package com.zhisheng.flink.model;
/**
* Desc:
* weixin: zhisheng_tian
* blog: http://www.54tianzhisheng.cn/
*/
public class Student {
public int id;
public String name;
public String password;
public int age;
public Student() {
}
public Student(int id, String name, String password, int age) {
this.id = id;
this.name = name;
this.password = password;
this.age = age;
}
@Override
public String toString() {
return "Student{" +
"id=" + id +
", name='" + name + '\'' +
", password='" + password + '\'' +
", age=" + age +
'}';
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
}复制代码
工具类往 kafka topic student 发送数据数据库
import com.alibaba.fastjson.JSON;
import com.zhisheng.flink.model.Metric;
import com.zhisheng.flink.model.Student;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
/**
* 往kafka中写数据
* 可使用这个main函数进行测试一下
* weixin: zhisheng_tian
* blog: http://www.54tianzhisheng.cn/
*/
public class KafkaUtils2 {
public static final String broker_list = "localhost:9092";
public static final String topic = "student"; //kafka topic 须要和 flink 程序用同一个 topic
public static void writeToKafka() throws InterruptedException {
Properties props = new Properties();
props.put("bootstrap.servers", broker_list);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer producer = new KafkaProducer<String, String>(props);
for (int i = 1; i <= 100; i++) {
Student student = new Student(i, "zhisheng" + i, "password" + i, 18 + i);
ProducerRecord record = new ProducerRecord<String, String>(topic, null, null, JSON.toJSONString(student));
producer.send(record);
System.out.println("发送数据: " + JSON.toJSONString(student));
}
producer.flush();
}
public static void main(String[] args) throws InterruptedException {
writeToKafka();
}
}复制代码
该类就是 Sink Function,继承了 RichSinkFunction ,而后重写了里面的方法。在 invoke 方法中将数据插入到 MySQL 中。apache
package com.zhisheng.flink.sink;
import com.zhisheng.flink.model.Student;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
/**
* Desc:
* weixin: zhisheng_tian
* blog: http://www.54tianzhisheng.cn/
*/
public class SinkToMySQL extends RichSinkFunction<Student> {
PreparedStatement ps;
private Connection connection;
/**
* open() 方法中创建链接,这样不用每次 invoke 的时候都要创建链接和释放链接
*
* @param parameters
* @throws Exception
*/
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
connection = getConnection();
String sql = "insert into Student(id, name, password, age) values(?, ?, ?, ?);";
ps = this.connection.prepareStatement(sql);
}
@Override
public void close() throws Exception {
super.close();
//关闭链接和释放资源
if (connection != null) {
connection.close();
}
if (ps != null) {
ps.close();
}
}
/**
* 每条数据的插入都要调用一次 invoke() 方法
*
* @param value
* @param context
* @throws Exception
*/
@Override
public void invoke(Student value, Context context) throws Exception {
//组装数据,执行插入操做
ps.setInt(1, value.getId());
ps.setString(2, value.getName());
ps.setString(3, value.getPassword());
ps.setInt(4, value.getAge());
ps.executeUpdate();
}
private static Connection getConnection() {
Connection con = null;
try {
Class.forName("com.mysql.jdbc.Driver");
con = DriverManager.getConnection("jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8", "root", "root123456");
} catch (Exception e) {
System.out.println("-----------mysql get connection has exception , msg = "+ e.getMessage());
}
return con;
}
}复制代码
这里的 source 是从 kafka 读取数据的,而后 Flink 从 Kafka 读取到数据(JSON)后用阿里 fastjson 来解析成 student 对象,而后在 addSink 中使用咱们建立的 SinkToMySQL,这样就能够把数据存储到 MySQL 了。json
package com.zhisheng.flink;
import com.alibaba.fastjson.JSON;
import com.zhisheng.flink.model.Student;
import com.zhisheng.flink.sink.SinkToMySQL;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import java.util.Properties;
/**
* Desc:
* weixin: zhisheng_tian
* blog: http://www.54tianzhisheng.cn/
*/
public class Main3 {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("zookeeper.connect", "localhost:2181");
props.put("group.id", "metric-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "latest");
SingleOutputStreamOperator<Student> student = env.addSource(new FlinkKafkaConsumer011<>(
"student", //这个 kafka topic 须要和上面的工具类的 topic 一致
new SimpleStringSchema(),
props)).setParallelism(1)
.map(string -> JSON.parseObject(string, Student.class)); //Fastjson 解析字符串成 student 对象
student.addSink(new SinkToMySQL()); //数据 sink 到 mysql
env.execute("Flink add sink");
}
}复制代码
运行 Flink 程序,而后再运行 KafkaUtils2.java 工具类,这样就能够了。bootstrap
若是数据插入成功了,那么咱们查看下咱们的数据库:api
数据库中已经插入了 100 条咱们从 Kafka 发送的数据了。证实咱们的 SinkToMySQL 起做用了。是否是很简单?
怕你们不知道个人项目结构,这里发个截图看下:
本文主要利用一个 demo,告诉你们如何自定义 Sink Function,将从 Kafka 的数据 Sink 到 MySQL 中,若是你项目中有其余的数据来源,你也能够换成对应的 Source,也有可能你的 Sink 是到其余的地方或者其余不一样的方式,那么依旧是这个套路:继承 RichSinkFunction 抽象类,重写 invoke 方法。
转载请务必注明原创地址为:www.54tianzhisheng.cn/2018/10/31/…
微信公众号:zhisheng
另外我本身整理了些 Flink 的学习资料,目前已经所有放到微信公众号(zhisheng)了,你能够回复关键字:Flink 便可无条件获取到。另外也能够加我微信 你能够加个人微信:yuanblog_tzs,探讨技术!
更多私密资料请加入知识星球!
之后这个项目的全部代码都将放在这个仓库里,包含了本身学习 flink 的一些 demo 和博客
一、Flink 从0到1学习 —— Apache Flink 介绍
二、Flink 从0到1学习 —— Mac 上搭建 Flink 1.6.0 环境并构建运行简单程序入门
三、Flink 从0到1学习 —— Flink 配置文件详解
四、Flink 从0到1学习 —— Data Source 介绍
五、Flink 从0到1学习 —— 如何自定义 Data Source ?
六、Flink 从0到1学习 —— Data Sink 介绍
七、Flink 从0到1学习 —— 如何自定义 Data Sink ?
八、Flink 从0到1学习 —— Flink Data transformation(转换)
九、Flink 从0到1学习 —— 介绍 Flink 中的 Stream Windows
十、Flink 从0到1学习 —— Flink 中的几种 Time 详解
十一、Flink 从0到1学习 —— Flink 读取 Kafka 数据写入到 ElasticSearch
十二、Flink 从0到1学习 —— Flink 项目如何运行?
1三、Flink 从0到1学习 —— Flink 读取 Kafka 数据写入到 Kafka
1四、Flink 从0到1学习 —— Flink JobManager 高可用性配置
1五、Flink 从0到1学习 —— Flink parallelism 和 Slot 介绍
1六、Flink 从0到1学习 —— Flink 读取 Kafka 数据批量写入到 MySQL
1七、Flink 从0到1学习 —— Flink 读取 Kafka 数据写入到 RabbitMQ
1八、Flink 从0到1学习 —— Flink 读取 Kafka 数据写入到 HBase
1九、Flink 从0到1学习 —— Flink 读取 Kafka 数据写入到 HDFS
20、Flink 从0到1学习 —— Flink 读取 Kafka 数据写入到 Redis
2一、Flink 从0到1学习 —— Flink 读取 Kafka 数据写入到 Cassandra
2二、Flink 从0到1学习 —— Flink 读取 Kafka 数据写入到 Flume
2三、Flink 从0到1学习 —— Flink 读取 Kafka 数据写入到 InfluxDB
2四、Flink 从0到1学习 —— Flink 读取 Kafka 数据写入到 RocketMQ
2五、Flink 从0到1学习 —— 你上传的 jar 包藏到哪里去了
2六、Flink 从0到1学习 —— 你的 Flink job 日志跑到哪里去了
2八、Flink 从0到1学习 —— Flink 中如何管理配置?
2九、Flink 从0到1学习—— Flink 不能够连续 Split(分流)?
30、Flink 从0到1学习—— 分享四本 Flink 国外的书和二十多篇 Paper 论文
3二、为何说流处理即将来?
3三、OPPO 数据中台之基石:基于 Flink SQL 构建实时数据仓库
3六、Apache Flink 结合 Kafka 构建端到端的 Exactly-Once 处理
3八、如何基于Flink+TensorFlow打造实时智能异常检测平台?只看这一篇就够了
40、Flink 全网最全资源(视频、博客、PPT、入门、实战、源码解析、问答等持续更新)
4二、Flink 从0到1学习 —— 如何使用 Side Output 来分流?
四、Flink 源码解析 —— standalone session 模式启动流程
五、Flink 源码解析 —— Standalone Session Cluster 启动流程深度分析之 Job Manager 启动
六、Flink 源码解析 —— Standalone Session Cluster 启动流程深度分析之 Task Manager 启动
七、Flink 源码解析 —— 分析 Batch WordCount 程序的执行过程
八、Flink 源码解析 —— 分析 Streaming WordCount 程序的执行过程
九、Flink 源码解析 —— 如何获取 JobGraph?
十、Flink 源码解析 —— 如何获取 StreamGraph?
十一、Flink 源码解析 —— Flink JobManager 有什么做用?
十二、Flink 源码解析 —— Flink TaskManager 有什么做用?
1三、Flink 源码解析 —— JobManager 处理 SubmitJob 的过程
1四、Flink 源码解析 —— TaskManager 处理 SubmitJob 的过程
1五、Flink 源码解析 —— 深度解析 Flink Checkpoint 机制
1六、Flink 源码解析 —— 深度解析 Flink 序列化机制
1七、Flink 源码解析 —— 深度解析 Flink 是如何管理好内存的?
1八、Flink Metrics 源码解析 —— Flink-metrics-core
1九、Flink Metrics 源码解析 —— Flink-metrics-datadog
20、Flink Metrics 源码解析 —— Flink-metrics-dropwizard
2一、Flink Metrics 源码解析 —— Flink-metrics-graphite
2二、Flink Metrics 源码解析 —— Flink-metrics-influxdb
2三、Flink Metrics 源码解析 —— Flink-metrics-jmx
2四、Flink Metrics 源码解析 —— Flink-metrics-slf4j
2五、Flink Metrics 源码解析 —— Flink-metrics-statsd
2六、Flink Metrics 源码解析 —— Flink-metrics-prometheus
2七、Flink 源码解析 —— 如何获取 ExecutionGraph ?
30、Flink Clients 源码解析原文出处:zhisheng的博客,欢迎关注个人公众号:zhisheng