使用Restful方式提交Flink任务详解

Flink提供了丰富的客户端操做来提交任务,本文在Restful方式上提供扩展,其他四种方式可观看flink-china系列教程-客户端操做的具体分享spring

Flink从1.7版本开始支持RESTClient提交任务,RESTClient可能不少人不熟悉。使用RESTClient提交任务,换句话说就是Flink可支持API调用方式,提交你编写好的Flink代码,提交到Flink集群运行。本文演示读取kafka数据,使用Springcloud微服务框架调用启动,下面是具体实现步骤。apache

编写Flink程序

新建Springboot父工程,新建flink-service模块。flink-service新建如下代码,而后打包项目。打包项目的目的是把flink任务执行须要用到jar包,在使用RESTClient提交任务是,一块儿提交到集群运行。bootstrap

public class ReadFromsinkKafka {
  public static void main(String[] args) throws Exception {
    // create execution environment
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", "192.168.11.12:9092");
    properties.setProperty("group.id", "flinksink");
    DataStream<String> stream = env.addSource(new FlinkKafkaConsumer("flinktest", 
        new SimpleStringSchema(), properties));
    stream.map(new MapFunction<String, String>() {
      private static final long serialVersionUID = -6867736771747690202L;
      @Override
      public String map(String value) throws Exception {
        System.out.println(value);
        return value;
      }
    }).print();
    env.execute();
  }
}
复制代码

打包代码分享以下,在pom中删除springcloud原生打包方式spring-boot-maven-plugin,改成如下代码。按这个方式打包完成后,会获得flink-service-1.0-SNAPSHOT-kafka.jar和flink-service-1.0-SNAPSHOT.jar两个jar包,flink-service-1.0-SNAPSHOT-kafka.jar是你所编写Flink代码,flink-service-1.0-SNAPSHOT-kafka.jar是执行你的Flink程序须要用到的kafka base和client等jar包。bash

<build>
  <plugins>
  <!-- get default data from flink-examples-batch package -->
  <plugin>
  <groupId>org.apache.maven.plugins</groupId>
  <artifactId>maven-dependency-plugin</artifactId>
  <version>2.9</version>
  <executions>
    <execution>
    <id>unpack</id>
    <phase>prepare-package</phase>
    <goals>
    <goal>unpack</goal>
    </goals>
    <configuration>
    <artifactItems>
    <!-- for kafka connector-->
    <artifactItem>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
    <type>jar</type>
    <overWrite>false</overWrite>
    <outputDirectory>${project.build.directory}/classes</outputDirectory>
    <includes>org/apache/flink/**</includes>
    </artifactItem>
    <!-- for kafka base -->
    <artifactItem>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka-base_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
    <type>jar</type>
    <overWrite>false</overWrite>
    <outputDirectory>${project.build.directory}/classes</outputDirectory>
    <includes>org/apache/flink/**</includes>
    </artifactItem>
    <!-- for kafka client -->
    <artifactItem>
    <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.0.1</version>
    <type>jar</type>
    <overWrite>false</overWrite>
    <outputDirectory>${project.build.directory}/classes</outputDirectory>
    <includes>org/apache/**</includes>
    </artifactItem>
    </artifactItems>
    </configuration>
    </execution>
  </executions>
  </plugin>
  <plugin>
  <groupId>org.apache.maven.plugins</groupId>
  <artifactId>maven-jar-plugin</artifactId>
  <executions>
    <!-- kafka -->
    <execution>
    <id>kafka</id>
    <phase>package</phase>
    <goals>
    <goal>jar</goal>
    </goals>
    <configuration>
    <classifier>kafka</classifier>
    <archive>
    <manifestEntries>
    <program-class>com.flink.kafka.ReadFromsinkKafka</program-class>
    </manifestEntries>
    </archive>
    <includes>
    <include>**/com/flink/kafka/ReadFromsinkKafka.class</include>
    </includes>
    </configuration>
    </execution>
  </executions>
  </plugin>
  </plugins>
</build>
复制代码

Restful调用功能实现

新建controller,加入以下代码。本文实现RESTClient提交Flink任务的关键在于,经过createRemoteEnvironment 方法链接到远程Flink环境,拿到Flink执行环境环境后,执行env.execute()就能够提交任务至远程服务器环境执行。服务器

@RestController
@RequestMapping("flink")
public class FlinkController   {
  @RequestMapping(value="/test",method= RequestMethod.POST)
  public void test() throws Exception {
    String[] jars = {"flink-service/target/flink-service-1.0-SNAPSHOT-kafka.jar",
        "flink-service/target/flink-service-1.0-SNAPSHOT.jar"};
    StreamExecutionEnvironment env = 
        StreamExecutionEnvironment.createRemoteEnvironment("192.168.11.11",8081,2,jars);
    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", "192.168.11.12:9092");
    properties.setProperty("group.id", "flinksink");
    DataStream<String> stream = env.addSource(new FlinkKafkaConsumer("flinktest",
        new SimpleStringSchema(), properties));
    stream.map(new MapFunction<String, String>() {
      private static final long serialVersionUID = -6867736771747690202L;
      @Override
      public String map(String value) throws Exception
      {
        System.out.println(value);
        return value;
      }
    }).print();
    env.execute();
   }
}
复制代码

提交测试

经本人验证,此方法提交到Flink standalone集群和yarn集群均可以运行。向kafka中写入数据,能够在Flink日志中查看到数据。因排版问题,文章配图可在公众号的同名文章中查看到。app

最新Flink资讯可关注个同名公众号Flink实战应用指南框架

推荐阅读 Flink与TensorFlow整合详解maven

相关文章
相关标签/搜索