咱们知道Storm自己是一个独立运行的分布式流式数据处理框架,Springboot也是一个独立运行的web框架。那么如何在Strom框架中集成Springboot使得咱们可以在Storm开发中运用Spring的Ioc容器及其余如Spring Jpa等功能呢?咱们先来了解如下概念:
Storm主要的三个Component:Topology、Spout、Bolt。Topology做为主进程控制着spout、bolt线程的运行,他们至关于独立运行的容器分布于storm集群中的各个机器节点。
SpringApplication:是配置Spring应用上下文的起点。经过调用SpringApplication.run()方法它将建立ApplicationContext实例,这是咱们可以使用Ioc容器的主要BeanFactory。以后Spring将会加载全部单例模式的beans,并启动后台运行的CommandLineRunner beans等。
ApplicationContextAware:这是咱们可以在普通Java类中调用Spring容器里的beans的关键接口。html
实现原理git
Storm框架中的每一个Spout和Bolt都至关于独立的应用,Strom在启动spout和bolt时提供了一个open方法(spout)和prepare方法(bolt)。咱们能够把初始化Spring应用的操做放在这里,这样能够保证每一个spout/bolt应用在后续执行过程当中都能获取到Spring的ApplicationContext,有了ApplicationContext实例对象,Spring的全部功能就都能用上了。github
Spout.open方法实现br/>@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
//启动Springboot应用
SpringStormApplication.run();web
this.map = map; this.topologyContext = topologyContext; this.spoutOutputCollector = spoutOutputCollector;
}br/>Bolt.prepare方法实现
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
//启动Springboot应用
SpringStormApplication.run();spring
this.map = map; this.topologyContext = topologyContext; this.outputCollector = outputCollector;
}br/>SpringStormApplication启动类
@SpringBootApplication
@ComponentScan(value = "com.xxx.storm")
public class SpringStormApplication {
/**apache
@SpringBootApplication
@ComponentScan(value = "com.xxx.web")
public class PlatformApplication {
public static void main(String[] args) {
SpringApplication.run(PlatformApplication.class, args);br/>}
}
在spout/bolt中调用了SpringStormApplication.run方法后,咱们还须要可以拿到ApplicationContext容器对象,这时候咱们还须要实现ApplicationContextAware接口,写个工具类BeanUtils:
@Component
public class BeanUtils implements ApplicationContextAware {
private static ApplicationContext applicationContext = null;springboot
@Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { if (BeanUtils.applicationContext == null) { BeanUtils.applicationContext = applicationContext; } } public static ApplicationContext getApplicationContext() { return applicationContext; } public static Object getBean(String name) { return getApplicationContext().getBean(name); } public static <T> T getBean(Class<T> clazz) { return getApplicationContext().getBean(clazz); } public static <T> T getBean(String name, Class<T> clazz) { return getApplicationContext().getBean(name, clazz); }
}
经过@Component注解使得Spring在启动时可以扫描到该bean,由于BeanUtils实现了ApplicationContextAware接口,Spring会在启动成功时自动调用BeanUtils.setApplicationContext方法,将ApplicationContext对象保存到工具类的静态变量中,以后咱们就可使用BeanUtils.getBean()去获取Spring容器中的bean了。app
写个简单例子框架
在FilterBolt的execute方法中获取Spring beanbr/>@Override
public void execute(Tuple tuple) {
FilterService filterService = (FilterService) BeanUtils.getBean("filterService");
filterService.deleteAll();
}
定义FilterService类,这时候咱们就可使用Spring的相关注解,自动注入,Spring Jpa等功能了。br/>@Service("filterService")
public class FilterService {br/>@Autowired
UserRepository userRepository;分布式
public void deleteAll() { userRepository.deleteAll(); }
}
将storm应用做为Springboot工程的一个子模块
工程主目录的pom文件仍是springboot相关的依赖,在storm子模块中引入storm依赖,这时候启动Strom的topology应用会有一个日志包依赖冲突。
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/Applications/IntelliJ%20IDEA.app/Contents/bin/~/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.11.1/log4j-slf4j-impl-2.11.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/Applications/IntelliJ%20IDEA.app/Contents/bin/~/.m2/repository/ch/qos/logback/logback-classic/1.2.3/logback-classic-1.2.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
咱们须要在storm子模块的pom文件中重写org.springframework.boot:spring-boot-starter包依赖,将Springboot的相关日志包排除掉,以下:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId><exclusions><exclusion><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-to-slf4j2</artifactId></exclusion><exclusion><groupId>ch.qos.logback</groupId><artifactId>logback-classic2</artifactId></exclusion></exclusions></dependency>OK,完美整合!