从 v4.1 版本开始,EMQ X MQTT 服务器 提供了专门的多语言支持插件 emqx_extension_hook ,现已支持使用其余编程语言来处理 EMQ X 中的钩子事件,开发者可使用 Python 或者 Java 快速开发本身的插件,在官方功能的基础上进行扩展,知足本身的业务场景。例如:html
注:消息(Message) 类钩子,仅在企业版中支持。
Python 和 Java 驱动基于 Erlang/OTP-Port 进程间通讯实现,自己具备很是高的吞吐性能,本文以 Java 拓展为例介绍 EMQ X 跨语言拓展使用方式。java
io.emqx.extension.jar
和 erlport.jar
到项目依赖examples/SampleHandler.java
到您的项目中SampleHandler.java
中的示例编写业务代码,确保可以成功编译编译全部源代码后,须要将 sdk
和代码文件部署到 EMQ X 中:git
io.emqx.extension.jar
到 emqx/data/extension
目录.class
文件,例如 SampleHandler.class
复制到 emqx/data/extension
目录emqx/etc/plugins/emqx_extension_hook.conf
配置文件:exhook.drivers = java ## Search path for scripts or library exhook.drivers.java.path = data/extension/ exhook.drivers.java.init_module = SampleHandler
启动 emqx_extension_hook
插件,若是配置错误或 Java 代码编写错误将没法正常启动。启动后尝试创建 MQTT 链接并观察业务运行状况。github
如下为 SampleHandler.java 示例程序, 该程序继承自 SDK 中的 DefaultCommunicationHandler
类。该示例代码演示了如何挂载 EMQ X 系统中全部的钩子:数据库
import emqx.extension.java.handler.*; import emqx.extension.java.handler.codec.*; import emqx.extension.java.handler.ActionOptionConfig.Keys; public class SampleHandler extends DefaultCommunicationHandler { @Override public ActionOptionConfig getActionOption() { ActionOptionConfig option = new ActionOptionConfig(); option.set(Keys.MESSAGE_PUBLISH_TOPICS, "#"); option.set(Keys.MESSAGE_DELIVERED_TOPICS, "#"); option.set(Keys.MESSAGE_ACKED_TOPICS, "#"); option.set(Keys.MESSAGE_DROPPED_TOPICS, "#"); return option; } // Clients @Override public void onClientConnect(ConnInfo connInfo, Property[] props) { System.err.printf("[Java] onClientConnect: connInfo: %s, props: %s\n", connInfo, props); } @Override public void onClientConnack(ConnInfo connInfo, ReturnCode rc, Property[] props) { System.err.printf("[Java] onClientConnack: connInfo: %s, rc: %s, props: %s\n", connInfo, rc, props); } @Override public void onClientConnected(ClientInfo clientInfo) { System.err.printf("[Java] onClientConnected: clientinfo: %s\n", clientInfo); } @Override public void onClientDisconnected(ClientInfo clientInfo, Reason reason) { System.err.printf("[Java] onClientDisconnected: clientinfo: %s, reason: %s\n", clientInfo, reason); } // 断定认证结果,返回 true 或 false @Override public boolean onClientAuthenticate(ClientInfo clientInfo, boolean authresult) { System.err.printf("[Java] onClientAuthenticate: clientinfo: %s, authresult: %s\n", clientInfo, authresult); return true; } // 断定 ACL 检查结果,返回 true 或 false @Override public boolean onClientCheckAcl(ClientInfo clientInfo, PubSub pubsub, Topic topic, boolean result) { System.err.printf("[Java] onClientCheckAcl: clientinfo: %s, pubsub: %s, topic: %s, result: %s\n", clientInfo, pubsub, topic, result); return true; } @Override public void onClientSubscribe(ClientInfo clientInfo, Property[] props, TopicFilter[] topic) { System.err.printf("[Java] onClientSubscribe: clientinfo: %s, topic: %s, props: %s\n", clientInfo, topic, props); } @Override public void onClientUnsubscribe(ClientInfo clientInfo, Property[] props, TopicFilter[] topic) { System.err.printf("[Java] onClientUnsubscribe: clientinfo: %s, topic: %s, props: %s\n", clientInfo, topic, props); } // Sessions @Override public void onSessionCreated(ClientInfo clientInfo) { System.err.printf("[Java] onSessionCreated: clientinfo: %s\n", clientInfo); } @Override public void onSessionSubscribed(ClientInfo clientInfo, Topic topic, SubscribeOption opts) { System.err.printf("[Java] onSessionSubscribed: clientinfo: %s, topic: %s\n", clientInfo, topic); } @Override public void onSessionUnsubscribed(ClientInfo clientInfo, Topic topic) { System.err.printf("[Java] onSessionUnsubscribed: clientinfo: %s, topic: %s\n", clientInfo, topic); } @Override public void onSessionResumed(ClientInfo clientInfo) { System.err.printf("[Java] onSessionResumed: clientinfo: %s\n", clientInfo); } @Override public void onSessionDiscarded(ClientInfo clientInfo) { System.err.printf("[Java] onSessionDiscarded: clientinfo: %s\n", clientInfo); } @Override public void onSessionTakeovered(ClientInfo clientInfo) { System.err.printf("[Java] onSessionTakeovered: clientinfo: %s\n", clientInfo); } @Override public void onSessionTerminated(ClientInfo clientInfo, Reason reason) { System.err.printf("[Java] onSessionTerminated: clientinfo: %s, reason: %s\n", clientInfo, reason); } // Messages @Override public Message onMessagePublish(Message message) { System.err.printf("[Java] onMessagePublish: message: %s\n", message); return message; } @Override public void onMessageDropped(Message message, Reason reason) { System.err.printf("[Java] onMessageDropped: message: %s, reason: %s\n", message, reason); } @Override public void onMessageDelivered(ClientInfo clientInfo, Message message) { System.err.printf("[Java] onMessageDelivered: clientinfo: %s, message: %s\n", clientInfo, message); } @Override public void onMessageAcked(ClientInfo clientInfo, Message message) { System.err.printf("[Java] onMessageAcked: clientinfo: %s, message: %s\n", clientInfo, message); } }
SampleHandler
主要包含两部分:编程
重载了 getActionOption
方法。该方法对消息(Message)相关的钩子进行配置,指定了须要生效的主题列表。bash
配置项 | 对应钩子 |
---|---|
MESSAGE_PUBLISH_TOPICS | message_publish |
MESSAGE_DELIVERED_TOPICS | message_delivered |
MESSAGE_ACKED_TOPICS | message_acked |
MESSAGE_DROPPED_TOPICS | message_dropped |
on<hookName>
方法,这些方法是实际处理钩子事件的回调函数,函数命名方式为各个钩子名称变体后前面加 on
前缀,变体方式为钩子名称去掉下划线后使用骆驼拼写法(CamelCase),例如,钩子client_connect对应的函数名为onClientConnect。 EMQ X 客户端产生的事件,例如:链接、发布、订阅等,都会最终分发到这些钩子事件回调函数上,而后回调函数可对各属性及状态进行相关操做。 示例程序中仅对各参数进行了打印输出。若是只关心部分钩子事件,只需对这部分钩子事件的回调函数进行重载便可,不须要重载全部回调函数。各回调函数的执行时机和支持的钩子列表与 EMQ X 内置的钩子彻底一致,参见:Hooks - EMQ X服务器
在实现本身的扩展程序时,最简单的方式也是继承 DefaultCommunicationHandler
父类,该类对各钩子与回调函数的绑定进行了封装,并进一步封装了回调函数涉及到的参数数据结构,以方便快速上手使用。数据结构
若是对 Java 扩展程序的可控性要求更高,DefaultCommunicationHandler
类已没法知足需求时,能够经过实现 CommunicationHandler
接口,从更底层控制代码逻辑,编写更灵活的扩展程序。编程语言
package emqx.extension.java.handler; public interface CommunicationHandler { public Object init(); public void deinit(); }
init()
方法:用于初始化,声明扩展须要挂载哪些钩子,以及挂载的配置deinit()
方法:用于注销。详细数据格式说明,参见 设计文档。
版权声明: 本文为 EMQ 原创,转载请注明出处。原文连接:https://www.emqx.io/cn/blog/develop-emqx-plugin-using-java