Flink SQL 链接Hive并写入/读取数据

1. 添加依赖

<properties>
        <flink.version>1.11.2</flink.version>
        <scala.version>2.11</scala.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.11_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!--       添加flink table api 集成Hive的依赖-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-hive_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- Hive Dependency -->
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-exec</artifactId>
            <version>2.1.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-shaded-hadoop-2-uber</artifactId>
            <version>2.6.5-7.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-elasticsearch6_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-csv</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.4.0</version>
        </dependency>

        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-annotations</artifactId>
            <version>2.4.0</version>
        </dependency>

        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <version>2.4.0</version>
        </dependency>
    </dependencies>

2. 建立blink版本的批处理Table执行环境

EnvironmentSettings bbSettings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inBatchMode()
                .build();
TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);
  • 通过实际测试,目前HiveTableSink 不支持流式写入(未实现 AppendStreamTableSink),必须是批处理环境才能够往hive里面写入数据,而不能将流式数据写入hive。例如将kafka建立一张临时表,而后将表中的数据流持续插入hive,这是不能够的,官网上1.11版本经过flink sql-client能够实现hive的流式写入,还有待验证。

3. 链接文件系统,建立hive catalog,对表进行操做,相似于Spark on Hive,flink能够直接获取Hive的元数据,并使用flink进行计算。

// 链接外部文件
        bbTableEnv.connect(new FileSystem().path("file:///E:/d.txt"))
                .withFormat(new Csv().fieldDelimiter(','))
                .withSchema(new Schema().field("id", DataTypes.STRING()))
                .createTemporaryTable("output");

        // 设置 hive 方言
        bbTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
        // 获取hive-site.xml目录
        String hiveConfDir = Thread.currentThread().getContextClassLoader().getResource("").getPath().substring(1);
        HiveCatalog hive = new HiveCatalog("hive", "warningplatform", hiveConfDir);
        bbTableEnv.registerCatalog("hive", hive);

        bbTableEnv.useCatalog("hive");
        bbTableEnv.useDatabase("warningplatform");

        bbTableEnv.executeSql("insert into  test select id from    default_catalog.default_database.output");
  • 经过bbTableEnv.connect()去建立临时表的方式已通过时了,建议使用bbTableEnv.executeSql()的方式,经过DDL去建立临时表,临时表究竟是属于哪个catalog目前还不太肯定,究竟是什么规则目前还不清楚。 查资料得知,临时表与单个Flink会话的生命周期相关,临时表始终存储在内存中。 永久表须要一个catalog来管理表对应的元数据,好比hive metastore,该表将一直存在,直到明确删除该表为止。 所以猜想:default_catalog是存储在内存中,若是在切换成hive catalog以前建立临时表,那咱们就能够使用default_catalog.default_database.tableName来获取这个临时表。 若是切换了catalog再去建立临时表,那咱们就没法获取到临时表了,由于它不在default_catalog中,并且保存在内存里面,直接查询临时表会去当前的catalog里面去查找临时表,所以必定要在default_catalog 里面建立临时表。 而临时视图好像是存储在当前的catalog里面java

  • 经过bbTableEnv.createTemporaryView()建立的视图则是属于当前的database的sql

    bbTableEnv.createTemporaryView("output",bbTableEnv.sqlQuery("select * from default_catalog.default_database.output"));
  • 注意1.11版本的执行sql的方法发生了改变,经过执行环境的executeSql(),executeInsert()等来进行插入或者执行sql语句apache

相关文章
相关标签/搜索