参数可使用构造函数或者withParameters(Configuration)方法传递,参数将会做为函数对象的一部分被序列化并传递到task实例中!官网地址batchhtml
1 使用构造函数方式apache
DataSet<Integer> toFilter = env.fromElements(1, 2, 3); toFilter.filter(new MyFilter(2)); private static class MyFilter implements FilterFunction<Integer> { private final int limit; public MyFilter(int limit) { this.limit = limit; } @Override public boolean filter(Integer value) throws Exception { return value > limit; } }
2 withParameters(Configuration)方式api
这个方法将会携带一个Configuration对象做为参数,这个参数将会传递给Rich Function的open方法(关于Rich Function参见:rich function)。Configuration对象是一个Map,存储Key/Value键值对.ide
DataSet<Integer> toFilter = env.fromElements(1, 2, 3); Configuration config = new Configuration(); config.setInteger("limit", 2); toFilter.filter(new RichFilterFunction<Integer>() { private int limit; @Override public void open(Configuration parameters) throws Exception { limit = parameters.getInteger("limit", 0); } @Override public boolean filter(Integer value) throws Exception { return value > limit; } }).withParameters(config);
3 使用全局的the ExecutionConfig方式函数
参数能够被全部的rich function得到this
Configuration conf = new Configuration(); conf.setString("mykey","myvalue"); final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setGlobalJobParameters(conf); public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> { private String mykey; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); ExecutionConfig.GlobalJobParameters globalParams = getRuntimeContext().getExecutionConfig().getGlobalJobParameters(); Configuration globConf = (Configuration) globalParams; mykey = globConf.getString("mykey", null); } // ... more here ...