本文主要演示一下storm drpc实例html
version: '2' services: supervisor: image: storm container_name: supervisor command: storm supervisor -c storm.local.hostname="192.168.99.100" -c drpc.servers='["192.168.99.100"]' -c drpc.port=3772 -c drpc.invocations.port=3773 -c drpc.http.port=3774 depends_on: - nimbus - zookeeper links: - nimbus - zookeeper restart: always ports: - 6700:6700 - 6701:6701 - 6702:6702 - 6703:6703 - 8000:8000 drpc: image: storm container_name: drpc command: storm drpc -c storm.local.hostname="192.168.99.100" -c drpc.port=3772 -c drpc.invocations.port=3773 -c drpc.http.port=3774 depends_on: - nimbus - supervisor - zookeeper links: - nimbus - supervisor - zookeeper restart: always ports: - 3772:3772 - 3773:3773 - 3774:3774
好让外部的DRPCClient访问
)、drpc.invocations.port(让worker访问
)@Test public void testDeployDRPCStateQuery() throws InterruptedException, TException { TridentTopology topology = new TridentTopology(); FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"), new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"), new Values("how many apples can you eat")); spout.setCycle(true); TridentState wordCounts = topology.newStream("spout1", spout) .each(new Fields("sentence"), new Split(), new Fields("word")) .groupBy(new Fields("word")) //NOTE transforms a Stream into a TridentState object .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count")) .parallelismHint(6); topology.newDRPCStream("words") .each(new Fields("args"), new Split(), new Fields("word")) .groupBy(new Fields("word")) .stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count")) .each(new Fields("count"), new FilterNull()) .aggregate(new Fields("count"), new Sum(), new Fields("sum")); StormTopology stormTopology = topology.build(); //远程提交 mvn clean package -Dmaven.test.skip=true //storm默认会使用System.getProperty("storm.jar")去取,若是不设定,就不能提交 System.setProperty("storm.jar",TOPOLOGY_JAR); Config conf = new Config(); conf.put(Config.NIMBUS_SEEDS,Arrays.asList("192.168.99.100")); //配置nimbus链接主机地址,好比:192.168.10.1 conf.put(Config.NIMBUS_THRIFT_PORT,6627);//配置nimbus链接端口,默认 6627 conf.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList("192.168.99.100")); //配置zookeeper链接主机地址,能够使用集合存放多个 conf.put(Config.STORM_ZOOKEEPER_PORT,2181); //配置zookeeper链接端口,默认2181 StormSubmitter.submitTopology("DRPCStateQuery", conf, stormTopology); }
@Test public void testLaunchDrpcClient() throws TException { Config conf = new Config(); //NOTE 要设置Config.DRPC_THRIFT_TRANSPORT_PLUGIN属性,否则client直接跑空指针 conf.put(Config.DRPC_THRIFT_TRANSPORT_PLUGIN,SimpleTransportPlugin.class.getName()); conf.put(Config.STORM_NIMBUS_RETRY_TIMES,3); conf.put(Config.STORM_NIMBUS_RETRY_INTERVAL,10000); conf.put(Config.STORM_NIMBUS_RETRY_INTERVAL_CEILING,10000); conf.put(Config.DRPC_MAX_BUFFER_SIZE, 104857600); // 100M DRPCClient client = new DRPCClient(conf, "192.168.99.100", 3772); System.out.println(client.execute("words", "cat dog the man")); }
DRPCClient使用的是thrift协议调用
)