import re import random from datetime import datetime, time, timedelta import toml import os from loguru import logger class RoomState: """每个群的独立状态""" def __init__(self): self.participation_score = 0.0 self.last_active_time = datetime.now() self.last_bot_reply_time = None # 上次机器人回复的时间 class InterventionBot: def __init__(self, config_path=None): # 加载配置 self.config = {} if config_path and os.path.exists(config_path): self.config = toml.load(config_path) # 从配置中获取关键词 keywords = self.config.get("Keywords", {}) self.emojis = keywords.get("emojis", []) self.hot_topics = keywords.get("hot_topics", []) self.fish_keywords = keywords.get("fish_keywords", []) self.tech_keywords = keywords.get("tech_keywords", []) self.news_keywords = keywords.get("news_keywords",[]) # 拟人化配置 hl_config = self.config.get("HumanLike", {}) self.max_energy = hl_config.get("max_energy", 100.0) self.energy_recovery_rate = hl_config.get("energy_recovery_per_minute", 1.0) self.energy_cost = hl_config.get("energy_cost_per_reply", 15.0) self.participation_inc = hl_config.get("participation_increase_per_msg", 5.0) self.topic_bonus = hl_config.get("topic_match_bonus", 15.0) self.participation_threshold = hl_config.get("participation_threshold", 20.0) self.participation_drop = hl_config.get("participation_drop_factor", 0.8) self.base_prob = hl_config.get("base_reply_probability", 0.6) # 机器人全局状态 self.current_energy = self.max_energy self.last_energy_update_time = datetime.now() # 群组状态 {room_id: RoomState} self.room_states = {} # 辅助功能:早晨时间窗口 time_window = self.config.get("TimeWindow", {}) self.morning_window = ( time(time_window.get("morning_start_hour", 8), time_window.get("morning_start_minute", 0)), time(time_window.get("morning_end_hour", 8), time_window.get("morning_end_minute", 30)) ) def _get_room_state(self, room_id): if room_id not in self.room_states: self.room_states[room_id] = RoomState() return self.room_states[room_id] def _update_energy(self): """更新全局体力值""" now = datetime.now() minutes_passed = (now - self.last_energy_update_time).total_seconds() / 60.0 recovered = minutes_passed * self.energy_recovery_rate self.current_energy = min(self.max_energy, self.current_energy + recovered) self.last_energy_update_time = now logger.debug(f"[Energy] Recovered {recovered:.2f}, Current: {self.current_energy:.2f}") def detect_topic(self, message): if not isinstance(message, str): return None message_lower = message.lower() if any(keyword in message_lower for keyword in self.fish_keywords): return "fish" if any(keyword in message_lower for keyword in self.tech_keywords): return "tech" if any(keyword in message_lower for keyword in self.news_keywords): return "news" if any(keyword in message_lower for keyword in self.hot_topics): return "hot_topic" return None def is_morning_window(self, timestamp): try: # 简化时间处理,这里假设timestamp通常是当前时间附近 now = datetime.now() return self.morning_window[0] <= now.time() <= self.morning_window[1] except: return False def calculate_participation_boost(self, message, messages, is_at=False): """计算这条消息带来的参与度提升""" if is_at: return 100.0 # 被AT直接拉满 boost = self.participation_inc # 话题加成 topic = self.detect_topic(message) if topic: boost += self.topic_bonus # 关键词加成 (早安/表情等) if any(e in message for e in self.emojis): boost += 5 if "签到" in message or "早" in message: if self.is_morning_window(None): boost += 20 # 早上问好权重高 return boost def should_intervene(self, room_id, timestamp, message, messages, chat_log, is_at=False): """ 核心判定逻辑 :param room_id: 群ID :param timestamp: 消息时间 :param message: 当前消息内容 :param messages: 最近消息列表(文本) :param chat_log: 完整聊天记录对象 :param is_at: 是否被AT """ self._update_energy() state = self._get_room_state(room_id) # 1. 增加参与度(Listening) boost = self.calculate_participation_boost(message, messages, is_at) # 连续对话奖励:如果机器人在最近 2 分钟内回复过,说明可能在对话中,参与度增加翻倍 if state.last_bot_reply_time: time_since_last_reply = (datetime.now() - state.last_bot_reply_time).total_seconds() if time_since_last_reply < 120: # 2分钟内 boost *= 2.0 logger.debug(f"[{room_id}] 连续对话奖励触发 (上次回复 {int(time_since_last_reply)}s 前)") state.participation_score += boost state.last_active_time = datetime.now() logger.debug(f"[{room_id}] 收到消息: '{message}' | 参与度+{boost} -> {state.participation_score:.2f} | 体力: {self.current_energy:.2f}") # 2. 检查阈值 if state.participation_score < self.participation_threshold: return False # 3. 检查体力 if self.current_energy < self.energy_cost: logger.debug(f"[{room_id}] 体力不足 ({self.current_energy:.2f} < {self.energy_cost}),跳过") return False # 4. 概率判定 # 参与度越高,概率越高;体力越高,概率越高 # 归一化因子 participation_factor = min(state.participation_score / 100.0, 1.5) # 上限1.5倍 energy_factor = self.current_energy / self.max_energy final_prob = self.base_prob * participation_factor * energy_factor # 被AT必然回复 if is_at: final_prob = 1.0 # 随机判定 rand_val = random.random() should_reply = rand_val < final_prob logger.debug(f"[{room_id}] 判定: Prob={final_prob:.2f} (Base={self.base_prob} * Part={participation_factor:.2f} * Energy={energy_factor:.2f}) vs Rand={rand_val:.2f} -> {should_reply}") if should_reply: # 扣除消耗 self.current_energy -= self.energy_cost # 更新状态 state.last_bot_reply_time = datetime.now() # 降低参与度(满足了表达欲) # 改为减法,保留部分参与度以便连续对话 # 如果是高参与度(>50),减去 30;否则减半 if state.participation_score > 50: state.participation_score = max(0, state.participation_score - 40) else: state.participation_score *= 0.5 return True return False def rule_high_reply_rate(self, timestamp, chat_log): # 保留这个方法以兼容 main.py 的调用,或者在 main.py 中移除 # 这里我们可以简单的返回 False,因为新的逻辑已经包含了频率控制(通过体力值) return False