125 lines
5.1 KiB
Python
125 lines
5.1 KiB
Python
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime, time
|
|
from typing import Dict
|
|
|
|
|
|
@dataclass
|
|
class FlowState:
|
|
room_id: str
|
|
score: float = 0.0
|
|
state: str = "idle"
|
|
last_update: datetime = field(default_factory=datetime.now)
|
|
last_bot_reply_at: datetime | None = None
|
|
last_human_message_at: datetime | None = None
|
|
bot_reply_streak: int = 0
|
|
ignored_reply_count: int = 0
|
|
accepted_reply_count: int = 0
|
|
last_topic: str = ""
|
|
|
|
|
|
class FlowManager:
|
|
def __init__(self, config: Dict):
|
|
self.config = config or {}
|
|
self.states: Dict[str, FlowState] = {}
|
|
|
|
def get_state(self, room_id: str) -> FlowState:
|
|
if room_id not in self.states:
|
|
self.states[room_id] = FlowState(room_id=room_id)
|
|
return self.states[room_id]
|
|
|
|
def decay(self, room_id: str) -> FlowState:
|
|
state = self.get_state(room_id)
|
|
now = datetime.now()
|
|
elapsed_minutes = max((now - state.last_update).total_seconds() / 60.0, 0.0)
|
|
decay = elapsed_minutes * float(self.config.get("flow_decay_per_minute", 8))
|
|
state.score = max(0.0, state.score - decay)
|
|
state.last_update = now
|
|
state.state = self._score_to_state(state.score)
|
|
return state
|
|
|
|
def apply_message_event(self, room_id: str, event: Dict) -> FlowState:
|
|
state = self.decay(room_id)
|
|
now = datetime.now()
|
|
if self._is_night_silent(now.time()):
|
|
state.score = max(0.0, state.score - float(self.config.get("night_penalty", 30)))
|
|
if event.get("is_at"):
|
|
state.score += float(self.config.get("at_bot_boost", 40))
|
|
if event.get("is_question"):
|
|
state.score += float(self.config.get("question_boost", 30))
|
|
if event.get("is_followup"):
|
|
state.score += float(self.config.get("followup_boost", 20))
|
|
if event.get("topic_hit"):
|
|
state.score += float(self.config.get("topic_boost", 15))
|
|
if event.get("is_returning_member"):
|
|
state.score += float(self.config.get("returning_member_boost", 10))
|
|
if state.last_bot_reply_at:
|
|
since_reply = (now - state.last_bot_reply_at).total_seconds()
|
|
if since_reply <= 180 and event.get("message_after_bot"):
|
|
state.score += float(self.config.get("response_accepted_boost", 15))
|
|
state.accepted_reply_count += 1
|
|
state.bot_reply_streak = 0
|
|
state.last_human_message_at = now
|
|
state.last_topic = event.get("topic") or state.last_topic
|
|
state.state = self._score_to_state(state.score)
|
|
return state
|
|
|
|
def note_bot_reply(self, room_id: str) -> FlowState:
|
|
state = self.decay(room_id)
|
|
now = datetime.now()
|
|
if state.last_bot_reply_at and (not state.last_human_message_at or state.last_human_message_at < state.last_bot_reply_at):
|
|
since_last_bot_reply = (now - state.last_bot_reply_at).total_seconds()
|
|
if since_last_bot_reply <= 180:
|
|
state.ignored_reply_count += 1
|
|
state.score = max(0.0, state.score - float(self.config.get("ignored_reply_penalty", 20)))
|
|
state.last_bot_reply_at = datetime.now()
|
|
state.bot_reply_streak += 1
|
|
max_streak = int(self.config.get("max_bot_reply_streak", 3))
|
|
if state.bot_reply_streak > max_streak:
|
|
state.score = max(0.0, state.score - float(self.config.get("over_reply_penalty", 15)))
|
|
state.state = self._score_to_state(state.score)
|
|
return state
|
|
|
|
def get_acceptance_state(self, room_id: str) -> str:
|
|
state = self.get_state(room_id)
|
|
accepted = int(state.accepted_reply_count)
|
|
ignored = int(state.ignored_reply_count)
|
|
total = accepted + ignored
|
|
if total < 3:
|
|
return "neutral"
|
|
ratio = accepted / max(total, 1)
|
|
if ratio >= 0.7 and accepted >= 3:
|
|
return "warm"
|
|
if ratio <= 0.35 and ignored >= 2:
|
|
return "cold"
|
|
return "neutral"
|
|
|
|
def _score_to_state(self, score: float) -> str:
|
|
idle_threshold = float(self.config.get("idle_threshold", 20))
|
|
warming_threshold = float(self.config.get("warming_threshold", 40))
|
|
engaged_threshold = float(self.config.get("engaged_threshold", 70))
|
|
if score < idle_threshold:
|
|
return "idle"
|
|
if score < warming_threshold:
|
|
return "warming"
|
|
if score < engaged_threshold:
|
|
return "engaged"
|
|
return "deep_engaged"
|
|
|
|
def _is_night_silent(self, current_time: time) -> bool:
|
|
for window in self.config.get("night_silent_hours", []):
|
|
try:
|
|
start_str, end_str = window.split("-", 1)
|
|
start = time.fromisoformat(start_str)
|
|
end = time.fromisoformat(end_str)
|
|
if start <= end:
|
|
if start <= current_time <= end:
|
|
return True
|
|
else:
|
|
if current_time >= start or current_time <= end:
|
|
return True
|
|
except Exception:
|
|
continue
|
|
return False
|