转载请注明做者及出处java
因为业务需求须要把Strom与kafka整合到spring boot项目里,实现其余服务输出日志至kafka订阅话题,storm实时处理该话题完成数据监控及其余数据统计,可是网上教程较少,今天想写的就是如何整合storm+kafka 到spring boot,顺带说一说我遇到的坑。python
1. java 版本jdk-1.8git
2. 编译工具使用IDEA-2017github
3. maven做为项目管理web
4.spring boot-1.5.8.RELEASEredis
使用spring boot统一管理kafka、storm、redis等所须要的bean,经过其余服务日志收集至Kafka,KafKa实时发送日志至storm,在strom bolt时进行相应的处理操做spring
1.使用spring boot并无相关整合stormapache
2.以spring boot启动方式不知道如何触发提交Topolgyapi
3.提交Topology时遇到numbis not client localhost 问题tomcat
4.Storm bolt中没法经过注解得到实例化bean进行相应的操做
在整合以前咱们须要知道相应的spring boot 的启动方式及配置(若是你在阅读本文时,默认你已经对storm,kafka及spring boot有相关了解及使用)
spring boot 对storm进行整合的例子在网上不多,可是由于有相应的需求,所以咱们仍是须要整合.
首先导入所须要jar包:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.1.1</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
<exclusions>
<exclusion>
<artifactId>zookeeper</artifactId>
<groupId>org.apache.zookeeper</groupId>
</exclusion>
<exclusion>
<artifactId>spring-boot-actuator</artifactId>
<groupId>org.springframework.boot</groupId>
</exclusion>
<exclusion>
<artifactId>kafka-clients</artifactId>
<groupId>org.apache.kafka</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<exclusions>
<exclusion>
<artifactId>kafka-clients</artifactId>
<groupId>org.apache.kafka</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-hadoop</artifactId>
<version>2.5.0.RELEASE</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<artifactId>commons-logging</artifactId>
<groupId>commons-logging</groupId>
</exclusion>
<exclusion>
<artifactId>netty</artifactId>
<groupId>io.netty</groupId>
</exclusion>
<exclusion>
<artifactId>jackson-core-asl</artifactId>
<groupId>org.codehaus.jackson</groupId>
</exclusion>
<exclusion>
<artifactId>curator-client</artifactId>
<groupId>org.apache.curator</groupId>
</exclusion>
<exclusion>
<artifactId>jettison</artifactId>
<groupId>org.codehaus.jettison</groupId>
</exclusion>
<exclusion>
<artifactId>jackson-mapper-asl</artifactId>
<groupId>org.codehaus.jackson</groupId>
</exclusion>
<exclusion>
<artifactId>jackson-jaxrs</artifactId>
<groupId>org.codehaus.jackson</groupId>
</exclusion>
<exclusion>
<artifactId>snappy-java</artifactId>
<groupId>org.xerial.snappy</groupId>
</exclusion>
<exclusion>
<artifactId>jackson-xc</artifactId>
<groupId>org.codehaus.jackson</groupId>
</exclusion>
<exclusion>
<artifactId>guava</artifactId>
<groupId>com.google.guava</groupId>
</exclusion>
<exclusion>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<groupId>org.apache.hadoop</groupId>
</exclusion>
<exclusion>
<artifactId>zookeeper</artifactId>
<groupId>org.apache.zookeeper</groupId>
</exclusion>
<exclusion>
<artifactId>servlet-api</artifactId>
<groupId>javax.servlet</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.10</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.2.4</version>
<exclusions>
<exclusion>
<artifactId>log4j</artifactId>
<groupId>log4j</groupId>
</exclusion>
<exclusion>
<artifactId>zookeeper</artifactId>
<groupId>org.apache.zookeeper</groupId>
</exclusion>
<exclusion>
<artifactId>netty</artifactId>
<groupId>io.netty</groupId>
</exclusion>
<exclusion>
<artifactId>hadoop-common</artifactId>
<groupId>org.apache.hadoop</groupId>
</exclusion>
<exclusion>
<artifactId>guava</artifactId>
<groupId>com.google.guava</groupId>
</exclusion>
<exclusion>
<artifactId>hadoop-annotations</artifactId>
<groupId>org.apache.hadoop</groupId>
</exclusion>
<exclusion>
<artifactId>hadoop-yarn-common</artifactId>
<groupId>org.apache.hadoop</groupId>
</exclusion>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.3</version>
<exclusions>
<exclusion>
<artifactId>commons-logging</artifactId>
<groupId>commons-logging</groupId>
</exclusion>
<exclusion>
<artifactId>curator-client</artifactId>
<groupId>org.apache.curator</groupId>
</exclusion>
<exclusion>
<artifactId>jackson-mapper-asl</artifactId>
<groupId>org.codehaus.jackson</groupId>
</exclusion>
<exclusion>
<artifactId>jackson-core-asl</artifactId>
<groupId>org.codehaus.jackson</groupId>
</exclusion>
<exclusion>
<artifactId>log4j</artifactId>
<groupId>log4j</groupId>
</exclusion>
<exclusion>
<artifactId>snappy-java</artifactId>
<groupId>org.xerial.snappy</groupId>
</exclusion>
<exclusion>
<artifactId>zookeeper</artifactId>
<groupId>org.apache.zookeeper</groupId>
</exclusion>
<exclusion>
<artifactId>guava</artifactId>
<groupId>com.google.guava</groupId>
</exclusion>
<exclusion>
<artifactId>hadoop-auth</artifactId>
<groupId>org.apache.hadoop</groupId>
</exclusion>
<exclusion>
<artifactId>commons-lang</artifactId>
<groupId>commons-lang</groupId>
</exclusion>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>servlet-api</artifactId>
<groupId>javax.servlet</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-examples</artifactId>
<version>2.7.3</version>
<exclusions>
<exclusion>
<artifactId>commons-logging</artifactId>
<groupId>commons-logging</groupId>
</exclusion>
<exclusion>
<artifactId>netty</artifactId>
<groupId>io.netty</groupId>
</exclusion>
<exclusion>
<artifactId>guava</artifactId>
<groupId>com.google.guava</groupId>
</exclusion>
<exclusion>
<artifactId>log4j</artifactId>
<groupId>log4j</groupId>
</exclusion>
<exclusion>
<artifactId>servlet-api</artifactId>
<groupId>javax.servlet</groupId>
</exclusion>
</exclusions>
</dependency>
<!--storm-->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>${storm.version}</version>
<scope>${provided.scope}</scope>
<exclusions>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
</exclusion>
<exclusion>
<artifactId>servlet-api</artifactId>
<groupId>javax.servlet</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>1.1.1</version>
<exclusions>
<exclusion>
<artifactId>kafka-clients</artifactId>
<groupId>org.apache.kafka</groupId>
</exclusion>
</exclusions>
</dependency>
复制代码
其中去除jar包是由于须要相与项目构建依赖有多重依赖问题,storm版本为1.1.0 spring boot相关依赖为
```java
<!-- spring boot -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>${mybatis-spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
复制代码
ps:maven的jar包仅由于项目使用需求,不是最精简,仅供你们参考.
项目结构:
config-存储不一样环境配置文件
启动spring boot的时候咱们会发现
为了解决这个问题个人想法是: 启动spring boot->建立kafka监听Topic而后启动Topolgy完成启动,但是这样的问题kafka监听这个主题会重复触发Topolgy,这明显不是咱们想要的.看了一会后发现spring 有相关启动完成以后执行某个时间方法,这个对我来讲简直是救星啊.因此如今触发Topolgy的思路变为:
启动spring boot ->执行触发方法->完成相应的触发条件
构建方法为:
/** * @author Leezer * @date 2017/12/28 * spring加载完后自动自动提交Topology **/
@Configuration
@Component
public class AutoLoad implements ApplicationListener<ContextRefreshedEvent> {
private static String BROKERZKSTR;
private static String TOPIC;
private static String HOST;
private static String PORT;
public AutoLoad(@Value("${storm.brokerZkstr}") String brokerZkstr, @Value("${zookeeper.host}") String host, @Value("${zookeeper.port}") String port, @Value("${kafka.default-topic}") String topic ){
BROKERZKSTR = brokerZkstr;
HOST= host;
TOPIC= topic;
PORT= port;
}
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
try {
//实例化topologyBuilder类。
TopologyBuilder topologyBuilder = new TopologyBuilder();
//设置喷发节点并分配并发数,该并发数将会控制该对象在集群中的线程数。
BrokerHosts brokerHosts = new ZkHosts(BROKERZKSTR);
// 配置Kafka订阅的Topic,以及zookeeper中数据节点目录和名字
SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, TOPIC, "/storm", "s32");
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
spoutConfig.zkServers = Collections.singletonList(HOST);
spoutConfig.zkPort = Integer.parseInt(PORT);
//从Kafka最新输出日志读取
spoutConfig.startOffsetTime = OffsetRequest.LatestTime();
KafkaSpout receiver = new KafkaSpout(spoutConfig);
topologyBuilder.setSpout("kafka-spout", receiver, 1).setNumTasks(2);
topologyBuilder.setBolt("alarm-bolt", new AlarmBolt(), 1).setNumTasks(2).shuffleGrouping("kafka-spout");
Config config = new Config();
config.setDebug(false);
/*设置该topology在storm集群中要抢占的资源slot数,一个slot对应这supervisor节点上的以个worker进程,若是你分配的spot数超过了你的物理节点所拥有的worker数目的话,有可能提交不成功,加入你的集群上面已经有了一些topology而如今还剩下2个worker资源,若是你在代码里分配4个给你的topology的话,那么这个topology能够提交可是提交之后你会发现并无运行。 而当你kill掉一些topology后释放了一些slot后你的这个topology就会恢复正常运行。 */
config.setNumWorkers(1);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("kafka-spout", config, topologyBuilder.createTopology());
} catch (Exception e) {
e.printStackTrace();
}
}
}
复制代码
注:
[Tomcat-startStop-1] ERROR o.a.c.c.ContainerBase - A child container failed during start
java.util.concurrent.ExecutionException: org.apache.catalina.LifecycleException: Failed to start component [StandardEngine[Tomcat].StandardHost[localhost].TomcatEmbeddedContext[]]
at java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:1.8.0_144]
at java.util.concurrent.FutureTask.get(FutureTask.java:192) ~[?:1.8.0_144]
at org.apache.catalina.core.ContainerBase.startInternal(ContainerBase.java:939) [tomcat-embed-core-8.5.23.jar:8.5.23]
at org.apache.catalina.core.StandardHost.startInternal(StandardHost.java:872) [tomcat-embed-core-8.5.23.jar:8.5.23]
at org.apache.catalina.util.LifecycleBase.start(LifecycleBase.java:150) [tomcat-embed-core-8.5.23.jar:8.5.23]
at org.apache.catalina.core.ContainerBase$StartChild.call(ContainerBase.java:1419) [tomcat-embed-core-8.5.23.jar:8.5.23]
at org.apache.catalina.core.ContainerBase$StartChild.call(ContainerBase.java:1409) [tomcat-embed-core-8.5.23.jar:8.5.23]
at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) [?:1.8.0_144]
at java.util.concurrent.FutureTask.run(FutureTask.java) [?:1.8.0_144]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_144]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_144]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_144]
复制代码
这是由于有相应导入的jar包引入了servlet-api版本低于内嵌版本,咱们须要作的就是打开maven依赖把其去除
<exclusion>
<artifactId>servlet-api</artifactId>
<groupId>javax.servlet</groupId>
</exclusion>
复制代码
而后从新启动就能够了.
启动过程当中还有可能报:
org.apache.storm.utils.NimbusLeaderNotFoundException: Could not find leader nimbus from seed hosts [localhost]. Did you specify a valid list of nimbus hosts for config nimbus.seeds?at org.apache.storm.utils.NimbusClient.getConfiguredClientAs(NimbusClient.java:90
复制代码
这个问题我思考了好久,发现网上的解释都是由于storm配置问题致使不对,但是个人storm是部署在服务器上的.并无相关的配置,按理也应该去服务器上读取相关配置,但是结果并非这样的。最后尝试了几个作法发现都不对,这里才发现,在构建集群的时候storm提供了相应的本地集群
LocalCluster cluster = new LocalCluster();
复制代码
进行本地测试,若是在本地测试就使用其进行部署测试,若是部署到服务器上须要把:
cluster.submitTopology("kafka-spout", config, topologyBuilder.createTopology());
//修正为:
StormSubmitter.submitTopology("kafka-spout", config, topologyBuilder.createTopology());
复制代码
进行任务提交;
以上解决了上面所述的问题1-3
问题4:是在bolt中使用相关bean实例,我发现我把其使用@Component加入spring中也没法获取到实例:个人猜测是在咱们构建提交Topolgy的时候,它会在:
topologyBuilder.setBolt("alarm-bolt",new AlarmBolt(),1).setNumTasks(2).shuffleGrouping("kafka-spout");
复制代码
执行bolt相关:
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
StormLauncher stormLauncher = StormLauncher.getStormLauncher();
dataRepositorys =(AlarmDataRepositorys) stormLauncher.getBean("alarmdataRepositorys");
}
复制代码
而不会实例化bolt,致使线程不一而spring 获取不到.(这里我也不是太明白,若是有大佬知道能够分享一波)
而咱们使用spring boot的意义就在于这些获取这些繁杂的对象,这个问题困扰了我好久.最终想到,咱们能够经过上下文getbean获取实例不知道能不能行,而后我就开始了定义:
例如我须要在bolt中使用一个服务:
/** * @author Leezer * @date 2017/12/27 * 存储操做失败时间 **/
@Service("alarmdataRepositorys")
public class AlarmDataRepositorys extends RedisBase implements IAlarmDataRepositorys {
private static final String ERRO = "erro";
/** * @param type 类型 * @param key key值 * @return 错误次数 **/
@Override
public String getErrNumFromRedis(String type,String key) {
if(type==null || key == null){
return null;
}else {
ValueOperations<String, String> valueOper = primaryStringRedisTemplate.opsForValue();
return valueOper.get(String.format("%s:%s:%s",ERRO,type,key));
}
}
/** * @param type 错误类型 * @param key key值 * @param value 存储值 **/
@Override
public void setErrNumToRedis(String type, String key,String value) {
try {
ValueOperations<String, String> valueOper = primaryStringRedisTemplate.opsForValue();
valueOper.set(String.format("%s:%s:%s", ERRO,type, key), value, Dictionaries.ApiKeyDayOfLifeCycle, TimeUnit.SECONDS);
}catch (Exception e){
logger.info(Dictionaries.REDIS_ERROR_PREFIX+String.format("key为%s存入redis失败",key));
}
}
复制代码
这里我指定了该bean的名称,则在bolt执行prepare时:使用getbean方法获取了相关bean就能完成相应的操做.
而后kafka订阅主题发送至我bolt进行相关的处理.而这里getbean的方法是在启动bootmain函数定义:
@SpringBootApplication
@EnableTransactionManagement
@ComponentScan({"service","storm"})
@EnableMongoRepositories(basePackages = {"storm"})
@PropertySource(value = {"classpath:service.properties", "classpath:application.properties","classpath:storm.properties"})
@ImportResource(locations = {
"classpath:/configs/spring-hadoop.xml",
"classpath:/configs/spring-hbase.xml"})
public class StormLauncher extends SpringBootServletInitializer {
//设置 安全线程launcher实例
private volatile static StormLauncher stormLauncher;
//设置上下文
private ApplicationContext context;
public static void main(String[] args) {
SpringApplicationBuilder application = new SpringApplicationBuilder(StormLauncher.class);
// application.web(false).run(args);该方式是spring boot不以web形式启动
application.run(args);
StormLauncher s = new StormLauncher();
s.setApplicationContext(application.context());
setStormLauncher(s);
}
private static void setStormLauncher(StormLauncher stormLauncher) {
StormLauncher.stormLauncher = stormLauncher;
}
public static StormLauncher getStormLauncher() {
return stormLauncher;
}
@Override
protected SpringApplicationBuilder configure(SpringApplicationBuilder application) {
return application.sources(StormLauncher.class);
}
/** * 获取上下文 * * @return the application context */
public ApplicationContext getApplicationContext() {
return context;
}
/** * 设置上下文. * * @param appContext 上下文 */
private void setApplicationContext(ApplicationContext appContext) {
this.context = appContext;
}
/** * 经过自定义name获取 实例 Bean. * * @param name the name * @return the bean */
public Object getBean(String name) {
return context.getBean(name);
}
/** * 经过class获取Bean. * * @param <T> the type parameter * @param clazz the clazz * @return the bean */
public <T> T getBean(Class<T> clazz) {
return context.getBean(clazz);
}
/** * 经过name,以及Clazz返回指定的Bean * * @param <T> the type parameter * @param name the name * @param clazz the clazz * @return the bean */
public <T> T getBean(String name, Class<T> clazz) {
return context.getBean(name, clazz);
}
复制代码
对了这里还有一个kafkaclient的坑:
Async loop died! java.lang.NoSuchMethodError: org.apache.kafka.common.network.NetworkSend.
复制代码
项目会报kafka client 问题,这是由于storm-kafka中,kafka使用的是0.8版本,而NetworkSend是0.9以上的版本,这里集成须要与你集成的kafka相关版本一致.
虽然集成比较简单,可是参考都比较少,加之刚开始接触storm因此思考比较多,也在这记录一下.
项目地址 - github