获取流数据的时候,一般须要根据所需把流拆分出其余多个流,根据不一样的流再去做相应的处理。java
举个例子:建立一个商品实时流,商品有季节标签,须要对不一样标签的商品作统计处理,这个时候就须要把商品数据流根据季节标签分流。spring
先模拟一个实时的数据流docker
import lombok.Data; @Data public class Product { public Integer id; public String seasonType; }
自定义Sourceapache
import common.Product; import org.apache.flink.streaming.api.functions.source.SourceFunction; import java.util.ArrayList; import java.util.Random; public class ProductStremingSource implements SourceFunction<Product> { private boolean isRunning = true; @Override public void run(SourceContext<Product> ctx) throws Exception { while (isRunning){ // 每一秒钟产生一条数据 Product product = generateProduct(); ctx.collect(product); Thread.sleep(1000); } } private Product generateProduct(){ int i = new Random().nextInt(100); ArrayList<String> list = new ArrayList(); list.add("spring"); list.add("summer"); list.add("autumn"); list.add("winter"); Product product = new Product(); product.setSeasonType(list.get(new Random().nextInt(4))); product.setId(i); return product; } @Override public void cancel() { } }
输出:api
使用 filter 算子根据数据的字段进行过滤。app
import common.Product; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import source.ProductStremingSource; public class OutputStremingDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<Product> source = env.addSource(new ProductStremingSource()); // 使用Filter分流 SingleOutputStreamOperator<Product> spring = source.filter(product -> "spring".equals(product.getSeasonType())); SingleOutputStreamOperator<Product> summer = source.filter(product -> "summer".equals(product.getSeasonType())); SingleOutputStreamOperator<Product> autumn = source.filter(product -> "autumn".equals(product.getSeasonType())); SingleOutputStreamOperator<Product> winter = source.filter(product -> "winter".equals(product.getSeasonType())); source.print(); winter.printToErr(); env.execute("output"); } }
结果输出(红色为季节标签是winter的分流输出):dom
重写OutputSelector内部类的select()方法,根据数据所须要分流的类型反正不一样的标签下,返回SplitStream,经过SplitStream的select()方法去选择相应的数据流。ide
只分流一次是没有问题的,可是不能使用它来作连续的分流。函数
SplitStream已经标记过期了code
public class OutputStremingDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<Product> source = env.addSource(new ProductStremingSource()); // 使用Split分流 SplitStream<Product> dataSelect = source.split(new OutputSelector<Product>() { @Override public Iterable<String> select(Product product) { List<String> seasonTypes = new ArrayList<>(); String seasonType = product.getSeasonType(); switch (seasonType){ case "spring": seasonTypes.add(seasonType); break; case "summer": seasonTypes.add(seasonType); break; case "autumn": seasonTypes.add(seasonType); break; case "winter": seasonTypes.add(seasonType); break; default: break; } return seasonTypes; } }); DataStream<Product> spring = dataSelect.select("machine"); DataStream<Product> summer = dataSelect.select("docker"); DataStream<Product> autumn = dataSelect.select("application"); DataStream<Product> winter = dataSelect.select("middleware"); source.print(); winter.printToErr(); env.execute("output"); } }
推荐使用这种方式
首先须要定义一个OutputTag用于标识不一样流
能够使用下面的几种函数处理流发送到分流中:
以后再用getSideOutput(OutputTag)选择流。
public class OutputStremingDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<Product> source = env.addSource(new ProductStremingSource()); // 使用Side Output分流 final OutputTag<Product> spring = new OutputTag<Product>("spring"); final OutputTag<Product> summer = new OutputTag<Product>("summer"); final OutputTag<Product> autumn = new OutputTag<Product>("autumn"); final OutputTag<Product> winter = new OutputTag<Product>("winter"); SingleOutputStreamOperator<Product> sideOutputData = source.process(new ProcessFunction<Product, Product>() { @Override public void processElement(Product product, Context ctx, Collector<Product> out) throws Exception { String seasonType = product.getSeasonType(); switch (seasonType){ case "spring": ctx.output(spring,product); break; case "summer": ctx.output(summer,product); break; case "autumn": ctx.output(autumn,product); break; case "winter": ctx.output(winter,product); break; default: out.collect(product); } } }); DataStream<Product> springStream = sideOutputData.getSideOutput(spring); DataStream<Product> summerStream = sideOutputData.getSideOutput(summer); DataStream<Product> autumnStream = sideOutputData.getSideOutput(autumn); DataStream<Product> winterStream = sideOutputData.getSideOutput(winter); // 输出标签为:winter 的数据流 winterStream.print(); env.execute("output"); } }
结果输出:
更多文章:www.ipooli.com
扫码关注公众号《ipoo》