Flink 从 0 到 1 学习 —— 如何自定义 Data Sink ?

前言

前篇文章 《从0到1学习Flink》—— Data Sink 介绍 介绍了 Flink Data Sink,也介绍了 Flink 自带的 Sink,那么如何自定义本身的 Sink 呢?这篇文章将写一个 demo 教你们将从 Kafka Source 的数据 Sink 到 MySQL 中去。java

准备工做

咱们先来看下 Flink 从 Kafka topic 中获取数据的 demo,首先你须要安装好了 FLink 和 Kafka 。mysql

运行启动 Flink、Zookepeer、Kafka,git

好了,都启动了!github

数据库建表

DROP TABLE IF EXISTS `student`;
CREATE TABLE `student` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `name` varchar(25) COLLATE utf8_bin DEFAULT NULL,
  `password` varchar(25) COLLATE utf8_bin DEFAULT NULL,
  `age` int(10) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8 COLLATE=utf8_bin;复制代码

实体类

Student.javasql

package com.zhisheng.flink.model;

/**
 * Desc:
 * weixin: zhisheng_tian
 * blog: http://www.54tianzhisheng.cn/
 */
public class Student {
    public int id;
    public String name;
    public String password;
    public int age;

    public Student() {
    }

    public Student(int id, String name, String password, int age) {
        this.id = id;
        this.name = name;
        this.password = password;
        this.age = age;
    }

    @Override
    public String toString() {
        return "Student{" +
                "id=" + id +
                ", name='" + name + '\'' +
                ", password='" + password + '\'' +
                ", age=" + age +
                '}';
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }
}复制代码

工具类

工具类往 kafka topic student 发送数据数据库

import com.alibaba.fastjson.JSON;
import com.zhisheng.flink.model.Metric;
import com.zhisheng.flink.model.Student;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

/**
 * 往kafka中写数据
 * 可使用这个main函数进行测试一下
 * weixin: zhisheng_tian
 * blog: http://www.54tianzhisheng.cn/
 */
public class KafkaUtils2 {
    public static final String broker_list = "localhost:9092";
    public static final String topic = "student";  //kafka topic 须要和 flink 程序用同一个 topic

    public static void writeToKafka() throws InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers", broker_list);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer producer = new KafkaProducer<String, String>(props);

        for (int i = 1; i <= 100; i++) {
            Student student = new Student(i, "zhisheng" + i, "password" + i, 18 + i);
            ProducerRecord record = new ProducerRecord<String, String>(topic, null, null, JSON.toJSONString(student));
            producer.send(record);
            System.out.println("发送数据: " + JSON.toJSONString(student));
        }
        producer.flush();
    }

    public static void main(String[] args) throws InterruptedException {
        writeToKafka();
    }
}复制代码

SinkToMySQL

该类就是 Sink Function,继承了 RichSinkFunction ,而后重写了里面的方法。在 invoke 方法中将数据插入到 MySQL 中。apache

package com.zhisheng.flink.sink;

import com.zhisheng.flink.model.Student;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;

/**
 * Desc:
 * weixin: zhisheng_tian
 * blog: http://www.54tianzhisheng.cn/
 */
public class SinkToMySQL extends RichSinkFunction<Student> {
    PreparedStatement ps;
    private Connection connection;

    /**
     * open() 方法中创建链接,这样不用每次 invoke 的时候都要创建链接和释放链接
     *
     * @param parameters
     * @throws Exception
     */
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        connection = getConnection();
        String sql = "insert into Student(id, name, password, age) values(?, ?, ?, ?);";
        ps = this.connection.prepareStatement(sql);
    }

    @Override
    public void close() throws Exception {
        super.close();
        //关闭链接和释放资源
        if (connection != null) {
            connection.close();
        }
        if (ps != null) {
            ps.close();
        }
    }

    /**
     * 每条数据的插入都要调用一次 invoke() 方法
     *
     * @param value
     * @param context
     * @throws Exception
     */
    @Override
    public void invoke(Student value, Context context) throws Exception {
        //组装数据,执行插入操做
        ps.setInt(1, value.getId());
        ps.setString(2, value.getName());
        ps.setString(3, value.getPassword());
        ps.setInt(4, value.getAge());
        ps.executeUpdate();
    }

    private static Connection getConnection() {
        Connection con = null;
        try {
            Class.forName("com.mysql.jdbc.Driver");
            con = DriverManager.getConnection("jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8", "root", "root123456");
        } catch (Exception e) {
            System.out.println("-----------mysql get connection has exception , msg = "+ e.getMessage());
        }
        return con;
    }
}复制代码

Flink 程序

这里的 source 是从 kafka 读取数据的,而后 Flink 从 Kafka 读取到数据(JSON)后用阿里 fastjson 来解析成 student 对象,而后在 addSink 中使用咱们建立的 SinkToMySQL,这样就能够把数据存储到 MySQL 了。json

package com.zhisheng.flink;

import com.alibaba.fastjson.JSON;
import com.zhisheng.flink.model.Student;
import com.zhisheng.flink.sink.SinkToMySQL;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;

import java.util.Properties;

/**
 * Desc:
 * weixin: zhisheng_tian
 * blog: http://www.54tianzhisheng.cn/
 */
public class Main3 {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("zookeeper.connect", "localhost:2181");
        props.put("group.id", "metric-group");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("auto.offset.reset", "latest");

        SingleOutputStreamOperator<Student> student = env.addSource(new FlinkKafkaConsumer011<>(
                "student",   //这个 kafka topic 须要和上面的工具类的 topic 一致
                new SimpleStringSchema(),
                props)).setParallelism(1)
                .map(string -> JSON.parseObject(string, Student.class)); //Fastjson 解析字符串成 student 对象

        student.addSink(new SinkToMySQL()); //数据 sink 到 mysql

        env.execute("Flink add sink");
    }
}复制代码

结果

运行 Flink 程序,而后再运行 KafkaUtils2.java 工具类,这样就能够了。bootstrap

若是数据插入成功了,那么咱们查看下咱们的数据库:api

数据库中已经插入了 100 条咱们从 Kafka 发送的数据了。证实咱们的 SinkToMySQL 起做用了。是否是很简单?

项目结构

怕你们不知道个人项目结构,这里发个截图看下:

最后

本文主要利用一个 demo,告诉你们如何自定义 Sink Function,将从 Kafka 的数据 Sink 到 MySQL 中,若是你项目中有其余的数据来源,你也能够换成对应的 Source,也有可能你的 Sink 是到其余的地方或者其余不一样的方式,那么依旧是这个套路:继承 RichSinkFunction 抽象类,重写 invoke 方法。

关注我

转载请务必注明原创地址为:www.54tianzhisheng.cn/2018/10/31/…

微信公众号:zhisheng

另外我本身整理了些 Flink 的学习资料,目前已经所有放到微信公众号(zhisheng)了,你能够回复关键字:Flink 便可无条件获取到。另外也能够加我微信 你能够加个人微信:yuanblog_tzs,探讨技术!

更多私密资料请加入知识星球!

Github 代码仓库

github.com/zhisheng17/…

之后这个项目的全部代码都将放在这个仓库里,包含了本身学习 flink 的一些 demo 和博客

博客

一、Flink 从0到1学习 —— Apache Flink 介绍

二、Flink 从0到1学习 —— Mac 上搭建 Flink 1.6.0 环境并构建运行简单程序入门

三、Flink 从0到1学习 —— Flink 配置文件详解

四、Flink 从0到1学习 —— Data Source 介绍

五、Flink 从0到1学习 —— 如何自定义 Data Source ?

六、Flink 从0到1学习 —— Data Sink 介绍

七、Flink 从0到1学习 —— 如何自定义 Data Sink ?

八、Flink 从0到1学习 —— Flink Data transformation(转换)

九、Flink 从0到1学习 —— 介绍 Flink 中的 Stream Windows

十、Flink 从0到1学习 —— Flink 中的几种 Time 详解

十一、Flink 从0到1学习 —— Flink 读取 Kafka 数据写入到 ElasticSearch

十二、Flink 从0到1学习 —— Flink 项目如何运行?

1三、Flink 从0到1学习 —— Flink 读取 Kafka 数据写入到 Kafka

1四、Flink 从0到1学习 —— Flink JobManager 高可用性配置

1五、Flink 从0到1学习 —— Flink parallelism 和 Slot 介绍

1六、Flink 从0到1学习 —— Flink 读取 Kafka 数据批量写入到 MySQL

1七、Flink 从0到1学习 —— Flink 读取 Kafka 数据写入到 RabbitMQ

1八、Flink 从0到1学习 —— Flink 读取 Kafka 数据写入到 HBase

1九、Flink 从0到1学习 —— Flink 读取 Kafka 数据写入到 HDFS

20、Flink 从0到1学习 —— Flink 读取 Kafka 数据写入到 Redis

2一、Flink 从0到1学习 —— Flink 读取 Kafka 数据写入到 Cassandra

2二、Flink 从0到1学习 —— Flink 读取 Kafka 数据写入到 Flume

2三、Flink 从0到1学习 —— Flink 读取 Kafka 数据写入到 InfluxDB

2四、Flink 从0到1学习 —— Flink 读取 Kafka 数据写入到 RocketMQ

2五、Flink 从0到1学习 —— 你上传的 jar 包藏到哪里去了

2六、Flink 从0到1学习 —— 你的 Flink job 日志跑到哪里去了

2七、阿里巴巴开源的 Blink 实时计算框架真香

2八、Flink 从0到1学习 —— Flink 中如何管理配置?

2九、Flink 从0到1学习—— Flink 不能够连续 Split(分流)?

30、Flink 从0到1学习—— 分享四本 Flink 国外的书和二十多篇 Paper 论文

3一、Flink 架构、原理与部署测试

3二、为何说流处理即将来?

3三、OPPO 数据中台之基石:基于 Flink SQL 构建实时数据仓库

3四、流计算框架 Flink 与 Storm 的性能对比

3五、Flink状态管理和容错机制介绍

3六、Apache Flink 结合 Kafka 构建端到端的 Exactly-Once 处理

3七、360深度实践:Flink与Storm协议级对比

3八、如何基于Flink+TensorFlow打造实时智能异常检测平台?只看这一篇就够了

3九、Apache Flink 1.9 重大特性提早解读

40、Flink 全网最全资源(视频、博客、PPT、入门、实战、源码解析、问答等持续更新)

4一、Flink 灵魂两百问,这谁顶得住?

4二、Flink 从0到1学习 —— 如何使用 Side Output 来分流?

4三、你公司到底需不须要引入实时计算引擎?

4四、一文让你完全了解大数据实时计算引擎 Flink

源码解析

一、Flink 源码解析 —— 源码编译运行

二、Flink 源码解析 —— 项目结构一览

三、Flink 源码解析—— local 模式启动流程

四、Flink 源码解析 —— standalone session 模式启动流程

五、Flink 源码解析 —— Standalone Session Cluster 启动流程深度分析之 Job Manager 启动

六、Flink 源码解析 —— Standalone Session Cluster 启动流程深度分析之 Task Manager 启动

七、Flink 源码解析 —— 分析 Batch WordCount 程序的执行过程

八、Flink 源码解析 —— 分析 Streaming WordCount 程序的执行过程

九、Flink 源码解析 —— 如何获取 JobGraph?

十、Flink 源码解析 —— 如何获取 StreamGraph?

十一、Flink 源码解析 —— Flink JobManager 有什么做用?

十二、Flink 源码解析 —— Flink TaskManager 有什么做用?

1三、Flink 源码解析 —— JobManager 处理 SubmitJob 的过程

1四、Flink 源码解析 —— TaskManager 处理 SubmitJob 的过程

1五、Flink 源码解析 —— 深度解析 Flink Checkpoint 机制

1六、Flink 源码解析 —— 深度解析 Flink 序列化机制

1七、Flink 源码解析 —— 深度解析 Flink 是如何管理好内存的?

1八、Flink Metrics 源码解析 —— Flink-metrics-core

1九、Flink Metrics 源码解析 —— Flink-metrics-datadog

20、Flink Metrics 源码解析 —— Flink-metrics-dropwizard

2一、Flink Metrics 源码解析 —— Flink-metrics-graphite

2二、Flink Metrics 源码解析 —— Flink-metrics-influxdb

2三、Flink Metrics 源码解析 —— Flink-metrics-jmx

2四、Flink Metrics 源码解析 —— Flink-metrics-slf4j

2五、Flink Metrics 源码解析 —— Flink-metrics-statsd

2六、Flink Metrics 源码解析 —— Flink-metrics-prometheus

2六、Flink Annotations 源码解析

2七、Flink 源码解析 —— 如何获取 ExecutionGraph ?

2八、大数据重磅炸弹——实时计算框架 Flink

2九、Flink Checkpoint-轻量级分布式快照

30、Flink Clients 源码解析原文出处:zhisheng的博客,欢迎关注个人公众号:zhisheng

相关文章
相关标签/搜索