本文简单介绍一下kafka streams的join操做app
A join operation merges two streams based on the keys of their data records, and yields a new stream. A join over record streams usually needs to be performed on a windowing basis because otherwise the number of records that must be maintained for performing the join may grow indefinitely.ide
KStreamBuilder builder = new KStreamBuilder(); KStream<String, String> left = builder.stream("intpu-left"); KStream<String, String> right = builder.stream("intpu-right"); KStream<String, String> all = left.selectKey((key, value) -> value.split(",")[1]) .join(right.selectKey((key, value) -> value.split(",")[0]), new ValueJoiner<String, String, String>() { @Override public String apply(String value1, String value2) { return value1 + "--" + value2; } }, JoinWindows.of(30000)); all.print();
因为join操做是根据key来,因此一般通常要再次映射一下key测试
sh bin/kafka-topics.sh --create --topic intpu-left --replication-factor 1 --partitions 3 --zookeeper localhost:2181 sh bin/kafka-topics.sh --create --topic intpu-right --replication-factor 1 --partitions 3 --zookeeper localhost:2181 sh bin/kafka-console-producer.sh --broker-list localhost:9092 --topic intpu-left sh bin/kafka-console-producer.sh --broker-list localhost:9092 --topic intpu-right
左边输入诸如ui
1,a 2,b 3,c 3,c 4,d 1,a 2,b 3,c 1,a 2,b 3,c 4,e 5,h 6,f 7,g
右边输入诸如code
a,hello b,world c,hehehe c,aaa d,eee a,cccc b,aaaaaa c,332435 a,dddd b,2324 c,ddddd e,23453 h,2222222 f,0o0o0o0 g,ssss
输出实例orm
[KSTREAM-MERGE-0000000014]: a , 1,a--a,dddd [KSTREAM-MERGE-0000000014]: b , 2,b--b,2324 2017-10-17 22:17:34.578 INFO --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] Committing all tasks because the commit interval 30000ms has elapsed 2017-10-17 22:17:34.578 INFO --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] Committing task StreamTask 0_0 2017-10-17 22:17:34.585 INFO --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] Committing task StreamTask 0_1
这里使用的是inner join,也有left join,也有outer join。若是要记录在时间窗口没有匹配上的记录,能够使用outer join,额外存储下来,而后再根据已经匹配的记录再过滤一次。kafka
输出实例it
[KSTREAM-MERGE-0000000014]: f , null--f,ddddddd [KSTREAM-MERGE-0000000014]: f , 4,f--f,ddddddd 2017-10-17 22:31:12.530 INFO --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] Committing all tasks because the commit interval 30000ms has elapsed 2017-10-17 22:31:12.530 INFO --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] Committing task StreamTask 0_0 2017-10-17 22:31:12.531 INFO --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] Committing task StreamTask 0_1 2017-10-17 22:31:12.531 INFO --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] Committing task StreamTask 1_0 2017-10-17 22:31:12.531 INFO --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] Committing task StreamTask 0_2 2017-10-17 22:31:12.533 INFO --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] Committing task StreamTask 1_1 2017-10-17 22:31:12.533 INFO --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] Committing task StreamTask 2_0 2017-10-17 22:31:12.539 INFO --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] Committing task StreamTask 1_2 2017-10-17 22:31:12.540 INFO --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] Committing task StreamTask 2_1 2017-10-17 22:31:12.541 INFO --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] Committing task StreamTask 2_2 [KSTREAM-MERGE-0000000014]: g , 5,g--null [KSTREAM-MERGE-0000000014]: h , 6,h--null [KSTREAM-MERGE-0000000014]: h , 6,h--h,ddddddd
kafka streams的join操做,很是适合不一样数据源的实时匹配操做。io