参考:http://wuchong.me/blog/2019/09/02/flink-sql-1-9-read-from-kafka-write-into-mysql/
参考:https://github.com/wuchong/flink-sql-submit
1)做一个最简单的sql配置文件
-- -- 开启 mini-batch -- SET table.exec.mini-batch.enabled=true; -- -- mini-batch的时间间隔,即作业需要额外忍受的延迟 -- SET table.exec.mini-batch.allow-latency=1s; -- -- 一个 mini-batch 中允许最多缓存的数据 -- SET table.exec.mini-batch.size=1000; -- -- 开启 local-global 优化 -- SET table.optimizer.agg-phase-strategy=TWO_PHASE; -- -- -- 开启 distinct agg 切分 -- SET table.optimizer.distinct-agg.split.enabled=true; -- source CREATE TABLE user_log ( user_id VARCHAR, item_id VARCHAR, category_id VARCHAR, behavior VARCHAR, ts TIMESTAMP ) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'user_behavior', 'connector.startup-mode' = 'earliest-offset', 'connector.properties.0.key' = 'zookeeper.connect', 'connector.properties.0.value' = 'localhost:2181', 'connector.properties.1.key' = 'bootstrap.servers', 'connector.properties.1.value' = 'localhost:9092', 'update-mode' = 'append', 'format.type' = 'json', 'format.derive-schema' = 'true' ); -- sink CREATE TABLE pvuv_sink ( dt VARCHAR, pv BIGINT, uv BIGINT ) WITH ( 'connector.type' = 'jdbc', 'connector.url' = 'jdbc:mysql://localhost:3306/flink-test', 'connector.table' = 'pvuv_sink', 'connector.username' = 'root', 'connector.password' = '123456', 'connector.write.flush.max-rows' = '1' ); INSERT INTO pvuv_sink SELECT DATE_FORMAT(ts, 'yyyy-MM-dd HH:00') dt, COUNT(*) AS pv, COUNT(DISTINCT user_id) AS uv FROM user_log GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd HH:00');
2) 执行主类方法,需要传入参数
-w G:\\flink-neiwang-dev\\flink-zeppelin-scala\\src\\main\\resources -f test.sql
这里可以随意修改,只需要传入sql文件地址就可以了,如下面的代码:
List<String> sql = Files.readAllLines(Paths.get("G:\\flink-neiwang-dev\\flink-zeppelin-scala\\src\\main\\resources\\test.sql"));
public class SqlSubmit { private String sqlFilePath; private String workSpace; private TableEnvironment tEnv; public static void main(String[] args) throws Exception { //todo 传入sql文件地址 final CliOptions options = CliOptionsParser.parseClient(args); SqlSubmit submit = new SqlSubmit(options); submit.run(); } // -------------------------------------------------------------------------------------------- private SqlSubmit(CliOptions options) { this.sqlFilePath = options.getSqlFilePath(); this.workSpace = options.getWorkingSpace(); } private void run() throws Exception { EnvironmentSettings settings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); this.tEnv = TableEnvironment.create(settings); List<String> sql = Files.readAllLines(Paths.get(workSpace + "/" + sqlFilePath)); // List<String> sql = Files.readAllLines(Paths.get("G:\\flink-neiwang-dev\\flink-zeppelin-scala\\src\\main\\resources\\test.sql")); List<SqlCommandParser.SqlCommandCall> calls = SqlCommandParser.parse(sql); for (SqlCommandParser.SqlCommandCall call : calls) { callCommand(call); } tEnv.execute("SQL Job"); } // -------------------------------------------------------------------------------------------- private void callCommand(SqlCommandParser.SqlCommandCall cmdCall) { switch (cmdCall.command) { case SET: callSet(cmdCall); break; case CREATE_TABLE: callCreateTable(cmdCall); break; case INSERT_INTO: callInsertInto(cmdCall); break; default: throw new RuntimeException("Unsupported command: " + cmdCall.command); } } private void callSet(SqlCommandParser.SqlCommandCall cmdCall) { String key = cmdCall.operands[0]; String value = cmdCall.operands[1]; tEnv.getConfig().getConfiguration().setString(key, value); } private void callCreateTable(SqlCommandParser.SqlCommandCall cmdCall) { String ddl = cmdCall.operands[0]; try { tEnv.sqlUpdate(ddl); } catch (SqlParserException e) { throw new RuntimeException("SQL parse failed:\n" + ddl + "\n", e); } } private void callInsertInto(SqlCommandParser.SqlCommandCall cmdCall) { String dml = cmdCall.operands[0]; try { tEnv.sqlUpdate(dml); } catch (SqlParserException e) { throw new RuntimeException("SQL parse failed:\n" + dml + "\n", e); } } }
3)执行运行,需要了解细节,直接debug。我们这样子就方便了开发人员,只需要写一个sql代码,我们可以通过代码远程提交到集群,也不需要打包。(代码提交任务到集群运行,请查看我之前写的文章)