Operators transform one or more DataStreams into a new DataStream. java
Operators操做转换一个或多个DataStream到一个新的DataStream 。ide
object DataStreamTransformationApp { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment filterFunction(env) env.execute("DataStreamTransformationApp") } def filterFunction(env: StreamExecutionEnvironment): Unit = { val data=env.addSource(new CustomNonParallelSourceFunction) data.map(x=>{ println("received:" + x) x }).filter(_%2 == 0).print().setParallelism(1) } }
数据源选择以前的任意一个数据源便可。spa
这里的map中没有作任何实质性的操做,filter中将全部的数都对2取模操做,打印结果以下:code
received:1 received:2 2 received:3 received:4 4 received:5 received:6 6 received:7 received:8 8
说明map中获得的全部的数据,而在filter中进行了过滤操做。orm
public static void filterFunction(StreamExecutionEnvironment env) { DataStreamSource<Long> data = env.addSource(new JavaCustomParallelSourceFunction()); data.setParallelism(1).map(new MapFunction<Long, Long>() { @Override public Long map(Long value) throws Exception { System.out.println("received:"+value); return value; } }).filter(new FilterFunction<Long>() { @Override public boolean filter(Long value) throws Exception { return value % 2==0; } }).print().setParallelism(1); }
须要先使用data.setParallelism(1)而后再进行map操做,不然会输出屡次。由于咱们用的是JavaCustomParallelSourceFunction(),而当咱们使用JavaCustomNonParallelSourceFunction时,默认就是并行度1,能够不用设置。get
def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment // filterFunction(env) unionFunction(env) env.execute("DataStreamTransformationApp") } def unionFunction(env: StreamExecutionEnvironment): Unit = { val data01 = env.addSource(new CustomNonParallelSourceFunction) val data02 = env.addSource(new CustomNonParallelSourceFunction) data01.union(data02).print().setParallelism(1) }
Union操做将两个数据集综合起来,能够一同处理,上面打印输出以下:it
1 1 2 2 3 3 4 4
public static void main(String[] args) throws Exception { StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); // filterFunction(environment); unionFunction(environment); environment.execute("JavaDataStreamTransformationApp"); } public static void unionFunction(StreamExecutionEnvironment env) { DataStreamSource<Long> data1 = env.addSource(new JavaCustomNonParallelSourceFunction()); DataStreamSource<Long> data2 = env.addSource(new JavaCustomNonParallelSourceFunction()); data1.union(data2).print().setParallelism(1); }
split能够将一个流拆成多个流,select能够从多个流中进行选择处理的流。io
def splitSelectFunction(env: StreamExecutionEnvironment): Unit = { val data = env.addSource(new CustomNonParallelSourceFunction) val split = data.split(new OutputSelector[Long] { override def select(value: Long): lang.Iterable[String] = { val list = new util.ArrayList[String]() if (value % 2 == 0) { list.add("even") } else { list.add("odd") } list } }) split.select("odd","even").print().setParallelism(1) }
能够根据选择的名称来处理数据。function
public static void splitSelectFunction(StreamExecutionEnvironment env) { DataStreamSource<Long> data = env.addSource(new JavaCustomNonParallelSourceFunction()); SplitStream<Long> split = data.split(new OutputSelector<Long>() { @Override public Iterable<String> select(Long value) { List<String> output = new ArrayList<>(); if (value % 2 == 0) { output.add("odd"); } else { output.add("even"); } return output; } }); split.select("odd").print().setParallelism(1); }