Flink提供了丰富的客户端操做来提交任务,本文在Restful方式上提供扩展,其他四种方式可观看flink-china系列教程-客户端操做的具体分享spring
Flink从1.7版本开始支持RESTClient提交任务,RESTClient可能不少人不熟悉。使用RESTClient提交任务,换句话说就是Flink可支持API调用方式,提交你编写好的Flink代码,提交到Flink集群运行。本文演示读取kafka数据,使用Springcloud微服务框架调用启动,下面是具体实现步骤。apache
新建Springboot父工程,新建flink-service模块。flink-service新建如下代码,而后打包项目。打包项目的目的是把flink任务执行须要用到jar包,在使用RESTClient提交任务是,一块儿提交到集群运行。bootstrap
public class ReadFromsinkKafka {
public static void main(String[] args) throws Exception {
// create execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "192.168.11.12:9092");
properties.setProperty("group.id", "flinksink");
DataStream<String> stream = env.addSource(new FlinkKafkaConsumer("flinktest",
new SimpleStringSchema(), properties));
stream.map(new MapFunction<String, String>() {
private static final long serialVersionUID = -6867736771747690202L;
@Override
public String map(String value) throws Exception {
System.out.println(value);
return value;
}
}).print();
env.execute();
}
}
复制代码
打包代码分享以下,在pom中删除springcloud原生打包方式spring-boot-maven-plugin,改成如下代码。按这个方式打包完成后,会获得flink-service-1.0-SNAPSHOT-kafka.jar和flink-service-1.0-SNAPSHOT.jar两个jar包,flink-service-1.0-SNAPSHOT-kafka.jar是你所编写Flink代码,flink-service-1.0-SNAPSHOT-kafka.jar是执行你的Flink程序须要用到的kafka base和client等jar包。bash
<build>
<plugins>
<!-- get default data from flink-examples-batch package -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.9</version>
<executions>
<execution>
<id>unpack</id>
<phase>prepare-package</phase>
<goals>
<goal>unpack</goal>
</goals>
<configuration>
<artifactItems>
<!-- for kafka connector-->
<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<type>jar</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>org/apache/flink/**</includes>
</artifactItem>
<!-- for kafka base -->
<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-base_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<type>jar</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>org/apache/flink/**</includes>
</artifactItem>
<!-- for kafka client -->
<artifactItem>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.1</version>
<type>jar</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>org/apache/**</includes>
</artifactItem>
</artifactItems>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<!-- kafka -->
<execution>
<id>kafka</id>
<phase>package</phase>
<goals>
<goal>jar</goal>
</goals>
<configuration>
<classifier>kafka</classifier>
<archive>
<manifestEntries>
<program-class>com.flink.kafka.ReadFromsinkKafka</program-class>
</manifestEntries>
</archive>
<includes>
<include>**/com/flink/kafka/ReadFromsinkKafka.class</include>
</includes>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
复制代码
新建controller,加入以下代码。本文实现RESTClient提交Flink任务的关键在于,经过createRemoteEnvironment 方法链接到远程Flink环境,拿到Flink执行环境环境后,执行env.execute()就能够提交任务至远程服务器环境执行。服务器
@RestController
@RequestMapping("flink")
public class FlinkController {
@RequestMapping(value="/test",method= RequestMethod.POST)
public void test() throws Exception {
String[] jars = {"flink-service/target/flink-service-1.0-SNAPSHOT-kafka.jar",
"flink-service/target/flink-service-1.0-SNAPSHOT.jar"};
StreamExecutionEnvironment env =
StreamExecutionEnvironment.createRemoteEnvironment("192.168.11.11",8081,2,jars);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "192.168.11.12:9092");
properties.setProperty("group.id", "flinksink");
DataStream<String> stream = env.addSource(new FlinkKafkaConsumer("flinktest",
new SimpleStringSchema(), properties));
stream.map(new MapFunction<String, String>() {
private static final long serialVersionUID = -6867736771747690202L;
@Override
public String map(String value) throws Exception
{
System.out.println(value);
return value;
}
}).print();
env.execute();
}
}
复制代码
经本人验证,此方法提交到Flink standalone集群和yarn集群均可以运行。向kafka中写入数据,能够在Flink日志中查看到数据。因排版问题,文章配图可在公众号的同名文章中查看到。app
最新Flink资讯可关注个同名公众号Flink实战应用指南框架
推荐阅读 Flink与TensorFlow整合详解maven