nonebot 是一个 QQ 消息机器人框架,它的一些实现机制,值得参考。python
阅读 nonebot 文档,第一个示例以下:git
import nonebot if __name__ == '__main__': nonebot.init() nonebot.load_builtin_plugins() nonebot.run(host='127.0.0.1', port=8080)
首先思考一下,要运行几个 QQ 机器人,确定是要保存一些动态的数据的。可是从上面的示例看,咱们并无建立什么对象来保存动态数据,很简单的就直接调用 nontbot.run()
了。这说明动态的数据被隐藏在了 nonebot 内部。github
接下来详细分析这几行代码:web
第一步是 nonebot.init()
,该方法源码以下:shell
# 这个全局变量用于保存 NoneBot 对象 _bot: Optional[NoneBot] = None def init(config_object: Optional[Any] = None) -> None: global _bot _bot = NoneBot(config_object) # 经过传入的配置对象,构造 NoneBot 实例。 if _bot.config.DEBUG: # 根据是否 debug 模式,来配置日志级别 logger.setLevel(logging.DEBUG) else: logger.setLevel(logging.INFO) # 在 websocket 启动前,先启动 scheduler(经过调用 quart 的 before_serving 装饰器) # 这其实是将 _start_scheduler 包装成一个 coroutine,而后丢到 quart 的 before_serving_funcs 队列中去。 _bot.server_app.before_serving(_start_scheduler) def _start_scheduler(): if scheduler and not scheduler.running: # 这个 scheduler 是使用的 apscheduler.schedulers.asyncio.AsyncIOScheduler scheduler.configure(_bot.config.APSCHEDULER_CONFIG) # 配置 scheduler 参数,该参数可经过 `nonebot.init()` 配置 scheduler.start() # 启动 scheduler,用于定时任务(如定时发送消息、每隔必定时间执行某任务) logger.info('Scheduler started')
能够看到,nonebot.init()
作了三件事:express
AsyncIOScheduler
。
AsyncIOScheduler
是一个异步 scheduler,这意味着它自己也会由 asyncio 的 eventloop 调度。它和 quart 是并发执行的。第二步是 nonebot.load_builtin_plugins()
,直接加载了 nonebot 内置的插件。该函数来自 plugin.py
:后端
class Plugin: __slots__ = ('module', 'name', 'usage') def __init__(self, module: Any, name: Optional[str] = None, usage: Optional[Any] = None): self.module = module # 插件对象自己 self.name = name # 插件名称 self.usage = usage # 插件的 help 字符串 # 和 `_bot` 相似的设计,用全局变量保存状态 _plugins: Set[Plugin] = set() def load_plugin(module_name: str) -> bool: try: module = importlib.import_module(module_name) # 经过模块名,动态 import 该模块 name = getattr(module, '__plugin_name__', None) usage = getattr(module, '__plugin_usage__', None) # 模块的全局变量 _plugins.add(Plugin(module, name, usage)) # 将加载好的模块放入 _plugins logger.info(f'Succeeded to import "{module_name}"') return True except Exception as e: logger.error(f'Failed to import "{module_name}", error: {e}') logger.exception(e) return False def load_plugins(plugin_dir: str, module_prefix: str) -> int: count = 0 for name in os.listdir(plugin_dir): # 遍历指定的文件夹 path = os.path.join(plugin_dir, name) if os.path.isfile(path) and \ (name.startswith('_') or not name.endswith('.py')): continue if os.path.isdir(path) and \ (name.startswith('_') or not os.path.exists( os.path.join(path, '__init__.py'))): continue m = re.match(r'([_A-Z0-9a-z]+)(.py)?', name) if not m: continue if load_plugin(f'{module_prefix}.{m.group(1)}'): # 尝试加载该模块 count += 1 return count def load_builtin_plugins() -> int: plugin_dir = os.path.join(os.path.dirname(__file__), 'plugins') # 获得内部 plugins 目录的路径 return load_plugins(plugin_dir, 'nonebot.plugins') # 直接加载该目录下的全部插件 def get_loaded_plugins() -> Set[Plugin]: """ 获取全部加载好的插件,通常用于提供命令帮助。 好比在收到 "帮助 拆字" 时,就从这里查询到 “拆字” 插件的 usage,返回给用户。 :return: a set of Plugin objects """ return _plugins
这就是插件的动态加载机制,能够看到获取已加载插件的惟一方法,就是 get_loaded_plugins()
,并且 plugins 是用集合来保存的。服务器
优化:仔细想一想,我以为用字典(Dict)来代替 Set 会更好一些,用“插件名”索引,这样能够防止出现同名的插件,并且查询插件时也不须要遍历整个 Set。websocket
load_plugin()
中手动注册。这两行以后,就直接 nonebot.run()
启动 quart 服务器了。session
从第一个例子中,只能看到上面这些。接下来考虑写一个自定义插件,看看 nonebot 的消息处理机制。项目结构以下:
awesome-bot ├── awesome │ └── plugins │ └── usage.py ├── bot.py └── config.py # 配置文件,写法参考 nonebot.default_config,建议使用类方式保存配置
bot.py
:
from os import path import nonebot import config if __name__ == '__main__': nonebot.init(config) # 使用自定义配置 nonebot.load_plugins( # 加载 awesome/plugins 下的自定义插件 path.join(path.dirname(__file__), 'awesome', 'plugins'), 'awesome.plugins' ) nonebot.run()
usage.py
:
import nonebot from nonebot import on_command, CommandSession @on_command('usage', aliases=['使用帮助', '帮助', '使用方法']) async def _(session: CommandSession): """以前说过的“帮助”命令""" plugins = list(filter(lambda p: p.name, nonebot.get_loaded_plugins())) arg = session.current_arg_text.strip().lower() if not arg: session.finish( '我如今支持的功能有:\n\n' + '\n'.join(p.name for p in plugins)) for p in plugins: # 若是 plugins 换成 dict 类型,就不须要循环遍历了 if p.name.lower() == arg: await session.send(p.usage)
查看装饰器 on_command
的内容,有 command/__init__.py
:
# key: one segment of command name # value: subtree or a leaf Command object _registry = {} # type: Dict[str, Union[Dict, Command]] # 保存命令与命令处理器 # key: alias # value: real command name _aliases = {} # type: Dict[str, CommandName_T] # 保存命令的别名(利用别名,从这里查找真正的命令名称,再用该名称查找命令处理器) # key: context id # value: CommandSession object _sessions = {} # type: Dict[str, CommandSession] # 保存与用户的会话,这样就能支持一些须要关联上下文的命令。好比赛文续传,或者须要花必定时间执行的命令,Session 有个 is_running。 def on_command(name: Union[str, CommandName_T], *, aliases: Iterable[str] = (), permission: int = perm.EVERYBODY, only_to_me: bool = True, privileged: bool = False, shell_like: bool = False) -> Callable: """ 用于注册命令处理器 :param name: 命令名称 (e.g. 'echo' or ('random', 'number')) :param aliases: 命令别名,建议用元组 :param permission: 该命令的默认权限 :param only_to_me: 是否只处理发送给“我”的消息 :param privileged: 已经存在此 session 时,是否仍然能被运行 :param shell_like: 使用相似 shell 的语法传递参数 """ def deco(func: CommandHandler_T) -> CommandHandler_T: if not isinstance(name, (str, tuple)): raise TypeError('the name of a command must be a str or tuple') if not name: raise ValueError('the name of a command must not be empty') cmd_name = (name,) if isinstance(name, str) else name cmd = Command(name=cmd_name, func=func, permission=permission, only_to_me=only_to_me, privileged=privileged) # 构造命令处理器 if shell_like: async def shell_like_args_parser(session): session.args['argv'] = shlex.split(session.current_arg) cmd.args_parser_func = shell_like_args_parser current_parent = _registry for parent_key in cmd_name[:-1]: # 循环将命令树添加到 _registry current_parent[parent_key] = current_parent.get(parent_key) or {} current_parent = current_parent[parent_key] current_parent[cmd_name[-1]] = cmd for alias in aliases: # 保存命令别名 _aliases[alias] = cmd_name return CommandFunc(cmd, func) return deco
该装饰器将命令处理器注册到模块的全局变量中,而后 quart 在收到消息时,会调用该模块的以下方法,查找对应的命令处理器,并使用它处理该命令:
async def handle_command(bot: NoneBot, ctx: Context_T) -> bool: """ 尝试将消息解析为命令,若是解析成功,并且用户拥有权限,就执行该命令。不然忽略。 此函数会被 "handle_message" 调用 """ cmd, current_arg = parse_command(bot, str(ctx['message']).lstrip()) # 尝试解析该命令 is_privileged_cmd = cmd and cmd.privileged if is_privileged_cmd and cmd.only_to_me and not ctx['to_me']: is_privileged_cmd = False disable_interaction = is_privileged_cmd if is_privileged_cmd: logger.debug(f'Command {cmd.name} is a privileged command') ctx_id = context_id(ctx) if not is_privileged_cmd: # wait for 1.5 seconds (at most) if the current session is running retry = 5 while retry > 0 and \ _sessions.get(ctx_id) and _sessions[ctx_id].running: retry -= 1 await asyncio.sleep(0.3) check_perm = True session = _sessions.get(ctx_id) if not is_privileged_cmd else None if session: if session.running: logger.warning(f'There is a session of command ' f'{session.cmd.name} running, notify the user') asyncio.ensure_future(send( bot, ctx, render_expression(bot.config.SESSION_RUNNING_EXPRESSION) )) # pretend we are successful, so that NLP won't handle it return True if session.is_valid: logger.debug(f'Session of command {session.cmd.name} exists') # since it's in a session, the user must be talking to me ctx['to_me'] = True session.refresh(ctx, current_arg=str(ctx['message'])) # there is no need to check permission for existing session check_perm = False else: # the session is expired, remove it logger.debug(f'Session of command {session.cmd.name} is expired') if ctx_id in _sessions: del _sessions[ctx_id] session = None if not session: if not cmd: logger.debug('Not a known command, ignored') return False if cmd.only_to_me and not ctx['to_me']: logger.debug('Not to me, ignored') return False session = CommandSession(bot, ctx, cmd, current_arg=current_arg) # 构造命令 Session,某些上下文相关的命令须要用到。 logger.debug(f'New session of command {session.cmd.name} created') return await _real_run_command(session, ctx_id, check_perm=check_perm, # 这个函数将命令处理函数包装成 task,而后等待该 task 完成,再返回结果。 disable_interaction=disable_interaction)
Web 中的 Session 通常是用于保存登陆状态,而聊天程序的 session,则主要是保存上下文。
若是要作赛文续传与成绩统计,Session 和 Command 确定是须要的,可是不能像 nonebot 这样作。
NoneBot 的命令格式限制得比较严,无法用来解析跟打器自动发送的成绩消息。也许命令应该更宽松:
还有就是 NoneBot 做者提到的一些问题:
本文为我的杂谈,不保证正确。若有错误,还请指正。