diff --git a/plugins/xiuren_image/main.py b/plugins/xiuren_image/main.py index 38193b2..a97f8de 100644 --- a/plugins/xiuren_image/main.py +++ b/plugins/xiuren_image/main.py @@ -10,6 +10,7 @@ from base.plugin_common.message_plugin_interface import MessagePluginInterface from base.plugin_common.plugin_interface import PluginStatus from plugins.xiuren_image.images_cache import ImageCacheManager from utils.decorator.plugin_decorators import plugin_stats_decorator +from utils.decorator.rate_limit_decorator import group_feature_rate_limit from utils.revoke.message_auto_revoke import MessageAutoRevoke from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager from utils.decorator.points_decorator import plugin_points_cost @@ -95,7 +96,8 @@ class XiurenImagePlugin(MessagePluginInterface): # 初始化图片缓存管理器 self.image_cache_manager = ImageCacheManager(self.image_folder, cache_size) - self.LOG.info(f"[{self.name}] 插件初始化完成,指令:{self._commands},图片目录:{self.image_folder},缓存大小:{cache_size}") + self.LOG.info( + f"[{self.name}] 插件初始化完成,指令:{self._commands},图片目录:{self.image_folder},缓存大小:{cache_size}") return True def start(self) -> bool: @@ -122,6 +124,7 @@ class XiurenImagePlugin(MessagePluginInterface): @plugin_stats_decorator(plugin_name="秀人网图片") @plugin_points_cost(5, "秀人网图片消耗积分", FEATURE_KEY) + @group_feature_rate_limit(max_per_minute=5, feature_key=FEATURE_KEY) async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: """处理消息""" content = str(message.get("content", "")).strip() @@ -148,7 +151,7 @@ class XiurenImagePlugin(MessagePluginInterface): # 发送图片 image_data = cached_image['bytes'] image_path = cached_image['path'] - + # 记录缓存状态 cache_count = self.image_cache_manager.get_cached_image_count() self.LOG.info(f"从缓存获取图片成功,路径:{image_path},当前缓存数量:{cache_count}") @@ -175,7 +178,7 @@ class XiurenImagePlugin(MessagePluginInterface): cached_image = self.image_cache_manager.get_cached_image_bytes() if cached_image: return cached_image - + # 如果缓存中没有,回退到原来的方式 self.LOG.warning("缓存中没有图片,回退到磁盘读取") pic_path = self._get_random_pic() @@ -191,9 +194,9 @@ class XiurenImagePlugin(MessagePluginInterface): except Exception as e: self.LOG.error(f"读取图片文件失败: {e}") return None - + return None - + except Exception as e: self.LOG.error(f"获取缓存图片失败: {e}") return None diff --git a/utils/decorator/rate_limit_decorator.py b/utils/decorator/rate_limit_decorator.py new file mode 100644 index 0000000..1e105cc --- /dev/null +++ b/utils/decorator/rate_limit_decorator.py @@ -0,0 +1,46 @@ +import time +import functools +from typing import Callable, Any, Dict, Tuple, Optional +from loguru import logger + +# 本地缓存,主键为 (group_id, feature_key) +_rate_limit_cache: Dict[Tuple[str, str], list] = {} + + +def group_feature_rate_limit(max_per_minute: int = 3, feature_key: str = None): + """ + 用于插件异步消息处理的限流装饰器,按 group_id 和 FEATURE_KEY 作为主键,每分钟最多允许 N 次。 + Args: + max_per_minute: 每分钟最大允许次数 + feature_key: 功能权限键名(可选,优先使用参数,其次用 self.FEATURE_KEY) + 用法: + @group_feature_rate_limit(max_per_minute=3, feature_key="DAILY_NEWS") + async def process_message(self, plugin_msg): + ... + """ + + def decorator(func: Callable) -> Callable: + @functools.wraps(func) + async def async_wrapper(self, message: Dict[str, Any], *args, **kwargs) -> Tuple[bool, str]: + group_id = message.get("roomid") + # 优先用参数,其次用 self.FEATURE_KEY + _feature_key = feature_key or getattr(self, "FEATURE_KEY", None) + if not group_id or not _feature_key: + # 缺少主键信息,不限流 + return await func(self, message, *args, **kwargs) + now = time.time() + key = (str(group_id), str(_feature_key)) + times = _rate_limit_cache.get(key, []) + # 只保留最近60秒内的记录 + times = [t for t in times if now - t < 60] + if len(times) >= max_per_minute: + logger.info( + f"限流: group_id={group_id}, feature_key={_feature_key}, {len(times)}/{max_per_minute} 次/分钟") + return False, f"触发频率过高,请稍后再试(限{max_per_minute}次/分钟)" + times.append(now) + _rate_limit_cache[key] = times + return await func(self, message, *args, **kwargs) + + return async_wrapper + + return decorator