Mqtt做为物联网比较流行的协议如今已经被大范围使用,其中也有不少开源的MQTT BROKEN。Moquette是用java基于netty实现的轻量级的MQTT BROKEN. Moquette基于Netty实现,性能问题至少前期能够不用考虑,在使用过程当中还算稳定,没有出现过较大的问题。github地址:https://github.com/andsel/moq...。java
本文更加注重代码实践,对于配置相关的知识会一笔带过,不作过多的详解。
假设已经搭建好SpringBoot环境,下载完Moquette。至于怎么引用Moquette,能够在原项目上修改,也能够达成Jar包添加到lib调用,也能够上传到Maven私服后经过配置pom引用。笔者是上传到Maven私服,而后经过maven导入。git
@Slf4j @Service public class MoquetteServer { @Value("${mqtt-server.config-path}") private String configFilePath; @Autowired private IAuthorizator authorizator; /** * Safety相关的拦截器,若是有其它业务,能够再去实现一个拦截器处理其它业务 */ @Autowired @Qualifier("safetyInterceptHandler") private InterceptHandler safetyinterceptHandler; private Server mqttServer; public void startServer() throws IOException { IResourceLoader configFileResourceLoader = new ClasspathResourceLoader(configFilePath); final IConfig config = new ResourceLoaderConfig(configFileResourceLoader); mqttServer = new Server(); /**添加处理Safety相关的拦截器,若是有其它业务,能够再去实现一个拦截器处理其它业务,而后也添加上便可*/ List<InterceptHandler> interceptHandlers = Arrays.asList(safetyinterceptHandler); /** * Authenticator 不显示设置,Server会默认以password_file建立一个ResourceAuthenticator * 若是须要更灵活的链接验证方案,能够继承IAuthenticator接口,自定义实现 */ mqttServer.startServer(config, interceptHandlers, null, null, authorizator); } public void stop() { mqttServer.stopServer(); } }
解释:
(1)添加@Service
(2)configFilePath是Moquette须要的moquette.conf的配置文件路径,笔者将配置文件放到了resouces目录下,在application.xml配置的路径,所以经过@Value自动注入路径值。
(3)由于是将配置文件放在了resources目录下,因此用Moquette提供的ClasspathResourceLoader加载的配置文件
(4)IAuthorizator 是权限验证接口
(5)InterceptHandler是Moquette订阅、发布、创建链接等相关事件的拦截回调业务处理逻辑接口github
2.实现IAuthorizator接口segmentfault
@Component public class PermitAllAuthorizator implements IAuthorizator { @Override public boolean canWrite(Topic topic, String user, String client) { /**能够控制某个用户的client,是否具备发布某个主题的权限,目前默认任何Client能够发布任主题*/ return true; } @Override public boolean canRead(Topic topic, String user, String client) { /**能够控制某个用户的client,是否具备接收某个主题的权限,目前默认任何Client能够接收任何主题*/ return true; } }
解释:实现另外一个很简单的受权接口,容许任何用户全部的读写请求app
3.实现InterceptHandler接口maven
@Slf4j @Component public class SafetyInterceptHandler extends AbstractInterceptHandler{ @Override public String getID() { return SafetyInterceptHandler.class.getName(); } @Override public void onConnect(InterceptConnectMessage msg) { } @Override public void onConnectionLost(InterceptConnectionLostMessage msg) { } @Override public void onPublish(InterceptPublishMessage msg) { } @Override public void onMessageAcknowledged(InterceptAcknowledgedMessage msg) { } }
解释:
1.简单实现InterceptHandler,继承自AbstractInterceptHandler,并重写了部分方法。能够根据业务须要实现不一样的方法。InterceptHandler接口是Moquette预留给开发者根据不一样事件处理业务逻辑的接口。ide
4.经过SpringBoot启动Moquette性能
@SpringBootApplication public class MqttServiceApplication { public static void main(String[] args) throws IOException { SpringApplication application = new SpringApplication(MqttServiceApplication.class); final ApplicationContext context = application.run(args); MoquetteServer server = context.getBean(MoquetteServer.class); server.startServer(); Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { server.stop(); log.info("Moquette Server stopped"); } }); } }
若是发现MoquetteServer没法启动,是不是SpringBoot默认的包扫描机制的问题,能够经过@ComponentScan解决。优化
经过以上操做,就能够在任何想要使用MoquetteServer的地方,经过@Autowired自动注入。spa
固然在MoquetteServer中笔者只是简单实现了封装,并无实现其它方法,读者彻底能够根据本身的须要在MoquetteServer中实现本身须要的功能,但前提是你要对Moquette的源码熟悉。
moquette改造笔记(二):优化BrokerInterceptor notifyTopicPublished()逻辑