聊聊flink的Queryable State

本文主要研究一下flink的Queryable Statehtml

实例

Job

@Test
    public void testValueStateForQuery() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment
                .createRemoteEnvironment("192.168.99.100", 8081, SubmitTest.JAR_FILE);
        env.addSource(new RandomTuple2Source())
                .keyBy(0) //key by first value of tuple
                .flatMap(new CountWindowAverage())
                .print();
        JobExecutionResult result = env.execute("testQueryableState");
        LOGGER.info("submit job result:{}",result);
    }
  • 这里运行一个job,它对tuple的第一个值做为key,而后flatMap操做使用的是CountWindowAverage

CountWindowAverage

public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {

    private transient ValueState<Tuple2<Long, Long>> sum; // a tuple containing the count and the sum

    @Override
    public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {
        Tuple2<Long, Long> currentSum = sum.value();
        if(currentSum == null){
            currentSum = Tuple2.of(1L,input.f1);
        }else{
            currentSum.f0 += 1;
            currentSum.f1 += input.f1;
        }

        sum.update(currentSum);

        if (currentSum.f0 >= 2) {
            out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
            sum.clear();
        }
    }

    @Override
    public void open(Configuration config) {
        ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
                new ValueStateDescriptor<>(
                        "average", // the state name
                        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {})); // type information
        descriptor.setQueryable("query-name");
        sum = getRuntimeContext().getState(descriptor);
    }
}
  • CountWindowAverage经过ValueStateDescriptor的setQueryable("query-name")方法,将state声明为是queryable的

QueryableStateClient

@Test
    public void testQueryStateByJobId() throws InterruptedException, IOException {
        //get jobId from flink ui running job page
        JobID jobId = JobID.fromHexString("793edfa93f354aa0274f759cb13ce79e");
        long key = 1L;
        //flink-core-1.7.0-sources.jar!/org/apache/flink/configuration/QueryableStateOptions.java
        QueryableStateClient client = new QueryableStateClient("192.168.99.100", 9069);

        // the state descriptor of the state to be fetched.
        ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
                new ValueStateDescriptor<>(
                        "average",
                        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}));

        CompletableFuture<ValueState<Tuple2<Long, Long>>> resultFuture =
                client.getKvState(jobId, "query-name", key, BasicTypeInfo.LONG_TYPE_INFO, descriptor);

        LOGGER.info("get kv state return future, waiting......");
        // org.apache.flink.queryablestate.exceptions.UnknownKeyOrNamespaceException: Queryable State Server : No state for the specified key/namespace.
        ValueState<Tuple2<Long, Long>> res = resultFuture.join();
        LOGGER.info("query result:{}",res.value());
        client.shutdownAndWait();
    }
  • 这里经过QueryableStateClient链接QueryableStateClientProxy进行query state;这里的jobId能够在job提交以后,经过ui界面查询获得,而后使用JobID.fromHexString方法转为JobID对象

小结

  • Queryable State的功能目前是beta版本,flink1.7的发行版默认没有开启,要开启的话,须要将flink-queryable-state-runtime_2.11-1.7.0.jar拷贝到/opt/flink/lib/目录下,这样子task manager启动的时候会打印诸如Started Queryable State Proxy Server @ /172.20.0.3:9069的日志,这样子就能够确认是启用了该功能
  • Queryable State在架构上涉及三个组件,一个是QueryableStateServer,它会在每一个task manager上运行,负责本地state存储;一个是QueryableStateClientProxy,它也在每一个task manager上运行,负责接收client发来的查询请求,而后从对应的task manager上获取对应的state,而后返回给client;一个是QueryableStateClient,它就是一般是运行在flink cluster以外,用于提交用户的state query
  • QueryableStateServer以及QueryableStateClientProxy均有ports、network-threads、query-threads的属性能够配置;QueryableStateServer默认的query.server.ports值为9097;QueryableStateClientProxy默认的query.proxy.ports值为9096,client端须要使用这个端口来进行请求
  • 声明state为queryable有两个方法,一个是经过KeyedStream.asQueryableState方法转为QueryableStateStream;一个是调用Managed keyed State的StateDescriptor的setQueryable进行声明;这两个的区别在于asQueryableState必须是直接做用于KeyedStream对象,所以KeyedStream就不能作后续的transform操做,相似于sink;而经过StateDescriptor的setQueryable进行声明则相对灵活一点;这里要注意没有queryable ListState
  • Queryable State目前有几点限制,一个是它生命周期跟task同样,在task运行完的时候就销毁了,没办法查询,后续可能支持在task完成以后查询;一个是目前的KvState的Notifications进行使用tell机制,后续可能改成ack模式;一个是目前query的statistics默认是禁用的,后续可能支持发布到metrics system

doc

相关文章
相关标签/搜索