From 7df4bd216f3fc462bc6c0a3635ec5ec1c05381e7 Mon Sep 17 00:00:00 2001 From: liuwei Date: Mon, 13 Apr 2026 15:53:34 +0800 Subject: [PATCH] feat: gate media downloads by group and retry douyu checks --- db/message_storage.py | 14 +++- plugins/douyu/main.py | 111 ++++++++++++++++++++++++------- utils/robot_cmd/robot_command.py | 1 + utils/wechat/message_to_db.py | 17 ++++- 4 files changed, 117 insertions(+), 26 deletions(-) diff --git a/db/message_storage.py b/db/message_storage.py index c71aa13..7c658b6 100644 --- a/db/message_storage.py +++ b/db/message_storage.py @@ -532,16 +532,21 @@ class MessageStorageDB(BaseDBOperator): return self.execute_query(sql, tuple(params)) or [] - def get_pending_media_messages(self, minutes_ago: int = 10, limit: int = 50) -> List[Dict]: + def get_pending_media_messages(self, minutes_ago: int = 10, limit: int = 50, + group_ids: Optional[List[str]] = None) -> List[Dict]: """获取最近N分钟内未处理图片/表情消息(image_path IS NULL) Args: minutes_ago: 查询最近多少分钟的消息,默认10分钟 limit: 每次最多处理多少条,默认50条 + group_ids: 限制只查询指定群组,传空列表则直接返回空 Returns: 包含消息ID、群ID、消息XML等信息的列表 """ + if group_ids is not None and not group_ids: + return [] + sql = """ SELECT message_id, group_id, sender, message_type, message_xml, timestamp, attachment_url FROM messages @@ -553,7 +558,12 @@ class MessageStorageDB(BaseDBOperator): ORDER BY timestamp ASC LIMIT %s """ - params = (minutes_ago, limit) + params: List = [minutes_ago] + if group_ids is not None: + placeholders = ", ".join(["%s"] * len(group_ids)) + sql = sql.replace("ORDER BY timestamp ASC", f"AND group_id IN ({placeholders})\n ORDER BY timestamp ASC") + params.extend(group_ids) + params.append(limit) return self.execute_query(sql, params) or [] def get_pending_image_messages(self, minutes_ago: int = 10, limit: int = 50) -> List[Dict]: diff --git a/plugins/douyu/main.py b/plugins/douyu/main.py index 77f1620..a745dd7 100644 --- a/plugins/douyu/main.py +++ b/plugins/douyu/main.py @@ -50,6 +50,8 @@ class DouyuDanmuRecorder: self._latest_vip_count: Optional[int] = None self._latest_diamond_count: Optional[int] = None self._last_stats_signature: Tuple[Optional[int], Optional[int]] = (None, None) + self._connect_retry_count = 3 + self._connect_retry_delay_seconds = 1 def _encode(self, msg: str) -> bytes: content = msg.encode("utf-8") + b"\x00" @@ -223,24 +225,51 @@ class DouyuDanmuRecorder: for url in ws_urls: if self._stop_event.is_set(): break - try: - self._ws = websocket.WebSocketApp( - url, - on_open=self._on_open, - on_message=self._on_message, - on_error=self._on_error, - on_close=self._on_close, - header=headers, - ) - self._ws.run_forever(sslopt=sslopt, ping_interval=30, ping_timeout=10) - except Exception as e: - logger.error(f"斗鱼弹幕连接失败({self.room_id}): {e}") - continue - finally: - self._ws = None + for attempt in range(1, self._connect_retry_count + 1): + if self._stop_event.is_set(): + break + reconnect_needed = False + try: + self._ws = websocket.WebSocketApp( + url, + on_open=self._on_open, + on_message=self._on_message, + on_error=self._on_error, + on_close=self._on_close, + header=headers, + ) + self._ws.run_forever(sslopt=sslopt, ping_interval=30, ping_timeout=10) + if self._stop_event.is_set(): + break + reconnect_needed = True + except Exception as e: + if attempt < self._connect_retry_count: + logger.warning( + f"斗鱼弹幕连接失败({self.room_id}),第{attempt}/{self._connect_retry_count}次重试: " + f"url={url} err={e}" + ) + time.sleep(self._connect_retry_delay_seconds) + continue + logger.error( + f"斗鱼弹幕连接失败({self.room_id}),已重试{self._connect_retry_count}次: " + f"url={url} err={e}" + ) + finally: + self._ws = None + if reconnect_needed and attempt < self._connect_retry_count: + logger.warning( + f"斗鱼弹幕连接中断({self.room_id}),第{attempt}/{self._connect_retry_count}次重试: url={url}" + ) + time.sleep(self._connect_retry_delay_seconds) + continue + if reconnect_needed and attempt >= self._connect_retry_count: + logger.error( + f"斗鱼弹幕连接中断({self.room_id}),已重试{self._connect_retry_count}次: url={url}" + ) + break if self._stop_event.is_set(): break - time.sleep(1) + time.sleep(self._connect_retry_delay_seconds) finally: self._ws = None @@ -490,6 +519,8 @@ class DouyuPlugin(MessagePluginInterface): self._daily_report_max_length = 1800 self._daily_report_send_image = True self._audience_stats_sample_interval_seconds = 60 + self._status_check_retry_count = 3 + self._status_check_retry_delay_seconds = 1 self._daily_report_llm_client: Optional[UnifiedLLMClient] = None self._danmu_recorders: Dict[str, DouyuDanmuRecorder] = {} async_job.every_minutes(self._check_interval)(self._scheduled_unified_check_job) @@ -502,6 +533,34 @@ class DouyuPlugin(MessagePluginInterface): return f"{type(exc).__name__}: {message}" return type(exc).__name__ + async def _fetch_json_with_retries(self, session: aiohttp.ClientSession, url: str, + headers: Dict[str, str], context: str, + params: Optional[Dict[str, Any]] = None) -> Any: + last_error: Optional[Exception] = None + for attempt in range(1, self._status_check_retry_count + 1): + try: + async with session.get( + url, + headers=headers, + params=params, + timeout=aiohttp.ClientTimeout(total=10) + ) as resp: + resp.raise_for_status() + return await resp.json(content_type=None) + except Exception as e: + last_error = e + if attempt < self._status_check_retry_count: + logger.warning( + f"{context}失败,第{attempt}/{self._status_check_retry_count}次重试: " + f"{self._format_exception(e)}" + ) + await asyncio.sleep(self._status_check_retry_delay_seconds) + continue + raise + if last_error: + raise last_error + raise RuntimeError(f"{context}失败,未获取到有效响应") + async def _scheduled_unified_check_job(self): """统一检查直播和鱼吧动态""" await self._scheduled_check_job() @@ -696,9 +755,12 @@ class DouyuPlugin(MessagePluginInterface): "User-Agent": self._user_agent, "Referer": f"https://www.douyu.com/{room_id}" } - async with session.get(url, headers=headers, timeout=aiohttp.ClientTimeout(total=10)) as resp: - resp.raise_for_status() - data = await resp.json(content_type=None) + data = await self._fetch_json_with_retries( + session, + url, + headers, + context=f"斗鱼在线检查(room_id={room_id})" + ) room_info = data.get("room", {}) if isinstance(data, dict) else {} show_status = room_info.get("show_status") nickname = room_info.get("nickname", "") @@ -825,10 +887,13 @@ class DouyuPlugin(MessagePluginInterface): "User-Agent": self._user_agent, "Referer": f"https://yuba.douyu.com/member/{hash_id}/main/news", } - async with session.get(self._yuba_api, headers=headers, params=params, - timeout=aiohttp.ClientTimeout(total=10)) as resp: - resp.raise_for_status() - data = await resp.json(content_type=None) + data = await self._fetch_json_with_retries( + session, + self._yuba_api, + headers, + context=f"斗鱼鱼吧检查(hash_id={hash_id})", + params=params + ) if data.get("error") != 0: logger.error(f"斗鱼鱼吧 API 错误 ({hash_id}): {data.get('msg')}") diff --git a/utils/robot_cmd/robot_command.py b/utils/robot_cmd/robot_command.py index c9092ab..0e2143c 100644 --- a/utils/robot_cmd/robot_command.py +++ b/utils/robot_cmd/robot_command.py @@ -32,6 +32,7 @@ class Feature(Enum): EPIC = 4, "📊 EPIC自动播报 [每周五自动发送]" # 新增的功能 DAILY_SUMMARY = 5, "🕤 每日群发言总结 [每日9:30定时发送]" GROUP_MEMBER_CHANGE = 6, "👥 群成员变更监控 [自动监控群成员变动并发送通知]" + MEDIA_DOWNLOAD = 7, "🖼️ 图片表情下载 [控制群图片/表情媒资自动下载]" # DAILY_SUMMARY = 3, "🕤 每日群发言总结 [每日9:30定时发送]" # AI_CAPABILITY = 4, "🤖 AI对话 [ai, 聊天, AI] 用法:ai 如何写一个机器人?" diff --git a/utils/wechat/message_to_db.py b/utils/wechat/message_to_db.py index 4a15d47..9f8029d 100644 --- a/utils/wechat/message_to_db.py +++ b/utils/wechat/message_to_db.py @@ -19,6 +19,7 @@ from db.levels_db import LevelsDBOperator from db.message_storage import MessageStorageDB # 导入积分系统 from db.points_db import PointsDBOperator, PointSource +from utils.robot_cmd.robot_command import Feature, GroupBotManager, PermissionStatus from utils.wechat.contact_manager import ContactManager from wechat_ipad import WechatAPIClient from wechat_ipad.models.message import WxMessage, MessageType @@ -317,8 +318,22 @@ class MessageStorage: batch_size: 每次处理多少条,默认20条 """ try: + enabled_groups = [ + group_id + for group_id in GroupBotManager.local_cache.get("group_list", set()) + if GroupBotManager.get_group_permission(group_id, Feature.MEDIA_DOWNLOAD) == PermissionStatus.ENABLED + ] + + if not enabled_groups: + logger.debug("媒体下载功能未在任何群启用,跳过本轮媒体处理") + return + # 查询未处理的图片/表情消息 - pending_messages = self.message_db.get_pending_media_messages(minutes_ago, batch_size) + pending_messages = self.message_db.get_pending_media_messages( + minutes_ago, + batch_size, + group_ids=enabled_groups + ) if not pending_messages: logger.debug(f"未发现待处理的媒体消息(最近{minutes_ago}分钟)")