从 v4.1 版本开始,EMQ X MQTT 服务器 提供了专门的多语言支持插件 emqx_extension_hook ,现已支持使用其余编程语言来处理 EMQ X 中的钩子事件,开发者可使用 Python 或者 Java 快速开发本身的插件,在官方功能的基础上进行扩展,知足本身的业务场景。例如:html
注:消息(Message) 类钩子,仅在企业版中支持。
Python 和 Java 驱动基于 Erlang/OTP-Port 进程间通讯实现,自己具备很是高的吞吐性能,本文以 Python 拓展为例介绍 EMQ X 跨语言拓展使用方式。java
经过 pip 命令在本地安装 SDK,确保使用 pip3 进行安装:python
pip3 install emqx-extension-sdk
修改 emqx-extension-hook
插件配置,正确使用拓展:git
## Setup the supported drivers ## ## Value: python2 | python3 | java exhook.drivers = python3 ## Search path for scripts/library exhook.drivers.python3.path = data/extension/hooks.py ## Call timeout ## ## Value: Duration ##exhook.drivers.python3.call_timeout = 5s ## Initial module name ## Your filename or module name exhook.drivers.python3.init_module = hooks
在 emqx/data/extension
目录下新建 hooks.py
文件,引入 SDK 编写业务逻辑,示例程序以下:github
## data/extension/hooks.py from emqx_extension.hooks import EmqxHookSdk, hooks_handler from emqx_extension.types import EMQX_CLIENTINFO_PARSE_T, EMQX_MESSAGE_PARSE_T # 继承 SDK HookSdk 类 class CustomHook(EmqxHookSdk): # 使用装饰器注册 hooks @hooks_handler() def on_client_connect(self, conninfo: EMQX_CLIENTINFO_PARSE_T = None, props: dict = None, state: list = None): print(f'[Python SDK] [on_client_connect] {conninfo.clientid} connecte') @hooks_handler() def on_client_connected(self, clientinfo: EMQX_CLIENTINFO_PARSE_T, state: list = None): print( f'[Python SDK] [on_client_connected] {clientinfo.clientid} connected') @hooks_handler() def on_client_check_acl(self, clientinfo: EMQX_CLIENTINFO_PARSE_T, pubsub: str, topic: str, result: bool, state: tuple) -> bool: print( f'[Python SDK] [on_client_check_acl] {clientinfo.username} check ACL: {pubsub} {topic}') # 用户名为空时,ACL 验证不经过 if clientinfo.username == '': return False return True @hooks_handler() def on_client_authenticate(self, clientinfo: EMQX_CLIENTINFO_PARSE_T, authresult, state) -> bool: print( f'[Python SDK] [on_client_authenticate] {clientinfo.clientid} authenticate') # clientid 不为空时,验证经过 if clientinfo.clientid != '': return True return False # on_message_* 仅支持企业版 @hooks_handler() def on_message_publish(self, message: EMQX_MESSAGE_PARSE_T, state): print( f'[Python SDK] [on_message_publish] {message.topic} {message.payload}') emqx_hook = CustomHook(hook_module=f'{__name__}.emqx_hook') def init(): return emqx_hook.start() def deinit(): return
启动 emqx_extension_hook
插件,若是配置错误或代码编写错误将没法正常启动。启动后尝试创建 MQTT 链接并观察业务运行状况。数据库
./bin/emqx_ctl plugins load emqx_extension_hook
目前 EMQ X Python 拓展 SDK 是开源的,若是对可控性、性能要求更高,或须要使用 Python 2.7 版本的运行环境,欢迎贡献代码或基于原始示例进行开发:编程
版权声明: 本文为 EMQ 原创,转载请注明出处。原文连接:https://www.emqx.io/cn/blog/d...bash