605 lines
28 KiB
Python
605 lines
28 KiB
Python
# -*- coding: utf-8 -*-
|
||
import json
|
||
import re
|
||
from collections import defaultdict
|
||
from datetime import datetime, timedelta
|
||
from typing import Dict, List, Optional, Tuple
|
||
|
||
from loguru import logger
|
||
|
||
from db.contacts_db import ContactsDBOperator
|
||
from db.member_digest_db import MemberDigestDBOperator
|
||
from db.message_storage import MessageStorageDB
|
||
from plugins.member_context.dify_client import DifyClient
|
||
from utils.compress_chat_data import compress_chat_data
|
||
|
||
|
||
class MemberDigestService:
|
||
"""成员分层摘要服务"""
|
||
|
||
def __init__(self, contacts_db: ContactsDBOperator, message_db: MessageStorageDB,
|
||
digest_db: MemberDigestDBOperator, plugin_config: Optional[Dict] = None):
|
||
self.contacts_db = contacts_db
|
||
self.message_db = message_db
|
||
self.digest_db = digest_db
|
||
self.LOG = logger
|
||
self.plugin_config = plugin_config or {}
|
||
|
||
api_config = self.plugin_config.get("api", {})
|
||
profile_config = self.plugin_config.get("profile", {})
|
||
self.dify_client = DifyClient(api_config)
|
||
|
||
self.ai_enabled = self.dify_client.enabled
|
||
self.ai_base_url = self.dify_client.base_url
|
||
self.ai_api_key = self.dify_client.api_key
|
||
self.ai_endpoint = self.dify_client.endpoint
|
||
self.ai_timeout = self.dify_client.timeout
|
||
|
||
self.bootstrap_days = int(profile_config.get("bootstrap_days", 365))
|
||
self.daily_message_limit = int(profile_config.get("daily_message_limit", 120))
|
||
self.daily_digest_min_messages = int(profile_config.get("daily_digest_min_messages", 6))
|
||
self.max_daily_digests_per_run = int(profile_config.get("max_daily_digests_per_run", 0))
|
||
self.weekly_digest_limit = int(profile_config.get("weekly_digest_limit", 16))
|
||
self.monthly_digest_limit = int(profile_config.get("monthly_digest_limit", 12))
|
||
self.final_daily_limit = int(profile_config.get("final_daily_limit", 8))
|
||
self.final_weekly_limit = int(profile_config.get("final_weekly_limit", 6))
|
||
self.final_monthly_limit = int(profile_config.get("final_monthly_limit", 6))
|
||
self.group_digest_days = int(profile_config.get("group_digest_days", 1))
|
||
|
||
def ensure_recent_group_daily_digests(self, chatroom_id: str, days: Optional[int] = None,
|
||
force: bool = False) -> Dict:
|
||
bootstrap_mode = False
|
||
if days is None:
|
||
bootstrap_mode = self._should_bootstrap_group(chatroom_id)
|
||
days = self.bootstrap_days if bootstrap_mode else self.group_digest_days
|
||
|
||
built_daily = 0
|
||
touched_members = set()
|
||
processed_dates = []
|
||
|
||
for offset in range(days):
|
||
target_date = (datetime.now() - timedelta(days=offset + 1)).strftime("%Y-%m-%d")
|
||
digests = self._build_group_daily_digests(chatroom_id, target_date, force=force)
|
||
processed_dates.append(target_date)
|
||
for digest in digests:
|
||
self.digest_db.save_digest(digest)
|
||
built_daily += 1
|
||
touched_members.add(digest.get("wxid"))
|
||
|
||
mode = "bootstrap" if bootstrap_mode else "incremental"
|
||
if bootstrap_mode:
|
||
bootstrap_status = "done" if built_daily > 0 or touched_members else "empty"
|
||
self.digest_db.save_group_state(
|
||
chatroom_id,
|
||
bootstrap_status=bootstrap_status,
|
||
bootstrap_days=days,
|
||
built_daily_count=built_daily,
|
||
touched_member_count=len(touched_members),
|
||
extra={"processed_dates": processed_dates},
|
||
)
|
||
self.LOG.info(
|
||
f"[成员交互摘要][群日摘要窗口] group={chatroom_id}, mode={mode}, days={days}, "
|
||
f"built_daily={built_daily}, touched_members={len(touched_members)}"
|
||
)
|
||
return {
|
||
"built_daily": built_daily,
|
||
"touched_members": list(touched_members),
|
||
"days": days,
|
||
"mode": mode,
|
||
"processed_dates": processed_dates,
|
||
}
|
||
|
||
def _should_bootstrap_group(self, chatroom_id: str) -> bool:
|
||
try:
|
||
group_state = self.digest_db.get_group_state(chatroom_id)
|
||
if group_state and group_state.get("bootstrap_status") in {"done", "empty"}:
|
||
return False
|
||
sql = """
|
||
SELECT 1
|
||
FROM t_member_digest
|
||
WHERE chatroom_id = %s AND digest_type = 'daily'
|
||
LIMIT 1
|
||
"""
|
||
row = self.digest_db.execute_query(sql, (chatroom_id,), fetch_one=True)
|
||
return not bool(row)
|
||
except Exception as e:
|
||
self.LOG.warning(f"[成员交互摘要] 检查群初始化状态失败,按增量处理: group={chatroom_id}, error={e}")
|
||
return False
|
||
|
||
def ensure_member_digest_pipeline(self, chatroom_id: str, wxid: str, force: bool = False,
|
||
enable_weekly: bool = True, enable_monthly: bool = True) -> Dict:
|
||
member = self.contacts_db.get_chatroom_member_info(chatroom_id, wxid) or {}
|
||
display_name = member.get("display_name") or member.get("nick_name") or wxid
|
||
|
||
all_daily_digests = self.digest_db.list_digests(chatroom_id, wxid, "daily", limit=400)
|
||
if not all_daily_digests:
|
||
return {
|
||
"display_name": display_name,
|
||
"daily_digests": [],
|
||
"weekly_digests": [],
|
||
"monthly_digests": [],
|
||
"all_daily_digests": [],
|
||
"all_weekly_digests": [],
|
||
"all_monthly_digests": [],
|
||
"stats": {"daily": 0, "weekly": 0, "monthly": 0, "active_days": 0, "built_daily": 0},
|
||
}
|
||
|
||
built_weekly = 0
|
||
built_monthly = 0
|
||
# 周/月摘要改成“触发即补偿”的思路:
|
||
# 1. 只要进入周/月任务,就检查历史上所有“已完结但缺失”的周期;
|
||
# 2. 缺了就补,不再依赖“今天刚好是不是周日/月末”这种单点窗口;
|
||
# 3. 具体是否跳过当前未完结周期,放到 _ensure_weekly_digests / _ensure_monthly_digests 内部判断。
|
||
if enable_weekly:
|
||
built_weekly = self._ensure_weekly_digests(chatroom_id, wxid, display_name, force=force)
|
||
|
||
# 月摘要依赖周摘要,所以这里默认在周摘要补偿完成后再继续检查月摘要缺口。
|
||
if enable_monthly:
|
||
built_monthly = self._ensure_monthly_digests(chatroom_id, wxid, display_name, force=force)
|
||
|
||
all_weekly_digests = self.digest_db.list_digests(chatroom_id, wxid, "weekly", limit=200)
|
||
all_monthly_digests = self.digest_db.list_digests(chatroom_id, wxid, "monthly", limit=120)
|
||
|
||
daily_digests = all_daily_digests[:self.final_daily_limit]
|
||
weekly_digests = all_weekly_digests[:self.final_weekly_limit]
|
||
monthly_digests = all_monthly_digests[:self.final_monthly_limit]
|
||
|
||
return {
|
||
"display_name": display_name,
|
||
"daily_digests": daily_digests,
|
||
"weekly_digests": weekly_digests,
|
||
"monthly_digests": monthly_digests,
|
||
"all_daily_digests": all_daily_digests,
|
||
"all_weekly_digests": all_weekly_digests,
|
||
"all_monthly_digests": all_monthly_digests,
|
||
"stats": {
|
||
"daily": len(all_daily_digests),
|
||
"weekly": len(all_weekly_digests),
|
||
"monthly": len(all_monthly_digests),
|
||
"active_days": len(all_daily_digests),
|
||
"built_daily": 0,
|
||
"built_weekly": built_weekly,
|
||
"built_monthly": built_monthly,
|
||
},
|
||
}
|
||
|
||
@staticmethod
|
||
def _get_closed_reference_date() -> datetime:
|
||
# 摘要只基于“已经完整结束的一天”做补偿判断。
|
||
# 例如凌晨跑任务时,当天仍在进行中,所以统一以“昨天”为参照,
|
||
# 这样就能稳定地判断出“哪些周/月已经完结,哪些还是当前进行中的周期”。
|
||
return datetime.now() - timedelta(days=1)
|
||
|
||
@staticmethod
|
||
def _normalize_profile_item(item: Dict) -> Dict:
|
||
normalized = {}
|
||
list_keys = {
|
||
"topics", "identity_clues", "skill_signals", "family_signals", "life_stage_signals",
|
||
"discussion_scenarios", "problem_solving_signals", "value_preferences", "habit_signals",
|
||
"expression_markers", "engagement_traits", "reply_entry_points", "reply_taboos",
|
||
"representative_messages", "stable_topics", "identity_traits", "skill_profile",
|
||
"problem_solving_profile", "family_profile", "life_stage_profile", "value_profile",
|
||
"stable_traits", "habit_patterns", "expression_profile", "reply_entry_profile",
|
||
"reply_preferences", "long_term_topics", "long_term_reply_preferences", "common_scenarios",
|
||
"phase_state", "recent_state"
|
||
}
|
||
for key, value in item.items():
|
||
if key in list_keys:
|
||
if isinstance(value, list):
|
||
normalized[key] = [str(v).strip() for v in value if str(v).strip()]
|
||
elif value:
|
||
normalized[key] = [str(value).strip()]
|
||
else:
|
||
normalized[key] = []
|
||
elif isinstance(value, (int, float)):
|
||
normalized[key] = value
|
||
else:
|
||
normalized[key] = str(value).strip()
|
||
return normalized
|
||
|
||
def _ensure_weekly_digests(self, chatroom_id: str, wxid: str, display_name: str, force: bool = False) -> int:
|
||
daily_digests = self.digest_db.list_digests(chatroom_id, wxid, "daily", limit=400)
|
||
if not daily_digests:
|
||
return 0
|
||
|
||
grouped = defaultdict(list)
|
||
for item in daily_digests:
|
||
week_key, _, _ = self._week_period_bounds(item.get("period_key"))
|
||
grouped[week_key].append(item)
|
||
|
||
existing_keys = set(self.digest_db.list_digest_keys(chatroom_id, wxid, "weekly"))
|
||
reference_date = self._get_closed_reference_date()
|
||
current_week_key, _, _ = self._week_period_bounds(reference_date.strftime("%Y-%m-%d"))
|
||
built = 0
|
||
for week_key, items in sorted(grouped.items()):
|
||
if len(items) < 2:
|
||
continue
|
||
# 非强制模式下,只补“已完结”的历史周:
|
||
# 1. 当前参照周还没走完,不能提前生成;
|
||
# 2. 已存在的历史周不重复生成;
|
||
# 3. 这样周任务每次触发时,都能把之前漏掉的周摘要自动补齐。
|
||
if not force and week_key == current_week_key:
|
||
continue
|
||
if not force and week_key in existing_keys:
|
||
continue
|
||
period_key, period_start, period_end = self._week_period_bounds(items[0].get("period_key"))
|
||
digest = self._build_period_digest(
|
||
"weekly", chatroom_id, wxid, display_name, period_key, period_start, period_end, items
|
||
)
|
||
if digest:
|
||
self.digest_db.save_digest(digest)
|
||
built += 1
|
||
self.LOG.info(
|
||
f"[成员交互摘要][周摘要] 完成: group={chatroom_id}, wxid={wxid}, "
|
||
f"week={period_key}, days={len(items)}"
|
||
)
|
||
return built
|
||
|
||
def _ensure_monthly_digests(self, chatroom_id: str, wxid: str, display_name: str, force: bool = False) -> int:
|
||
weekly_digests = self.digest_db.list_digests(chatroom_id, wxid, "weekly", limit=200)
|
||
if not weekly_digests:
|
||
return 0
|
||
|
||
grouped = defaultdict(list)
|
||
for item in weekly_digests:
|
||
month_key, _, _ = self._month_period_bounds(item.get("period_end"))
|
||
grouped[month_key].append(item)
|
||
|
||
existing_keys = set(self.digest_db.list_digest_keys(chatroom_id, wxid, "monthly"))
|
||
reference_date = self._get_closed_reference_date()
|
||
current_month_key, _, _ = self._month_period_bounds(reference_date.strftime("%Y-%m-%d"))
|
||
built = 0
|
||
for month_key, items in sorted(grouped.items()):
|
||
if len(items) < 2:
|
||
continue
|
||
# 月摘要同样只补“已经完结的月份”:
|
||
# 1. 当前月仍可能继续产生新周摘要,不能过早固化;
|
||
# 2. 历史缺失月份在月任务/周任务触发时都会被补齐;
|
||
# 3. 这样即使某次月任务漏跑,后续任务也能自动追平。
|
||
if not force and month_key == current_month_key:
|
||
continue
|
||
if not force and month_key in existing_keys:
|
||
continue
|
||
period_key, period_start, period_end = self._month_period_bounds(items[-1].get("period_end"))
|
||
digest = self._build_period_digest(
|
||
"monthly", chatroom_id, wxid, display_name, period_key, period_start, period_end, items
|
||
)
|
||
if digest:
|
||
self.digest_db.save_digest(digest)
|
||
built += 1
|
||
self.LOG.info(
|
||
f"[成员交互摘要][月摘要] 完成: group={chatroom_id}, wxid={wxid}, "
|
||
f"month={period_key}, weeks={len(items)}"
|
||
)
|
||
return built
|
||
|
||
def _build_group_daily_digests(self, chatroom_id: str, digest_date: str, force: bool = False) -> List[Dict]:
|
||
members = self.contacts_db.get_chatroom_member_list(chatroom_id) or []
|
||
member_name_map = {}
|
||
for member in members:
|
||
wxid = member.get("wxid")
|
||
if not wxid:
|
||
continue
|
||
member_name_map[wxid] = member.get("display_name") or member.get("nick_name") or wxid
|
||
|
||
messages = self.message_db.get_member_messages_for_group_date(chatroom_id, digest_date)
|
||
if not messages:
|
||
return []
|
||
|
||
sender_messages = defaultdict(list)
|
||
for msg in messages:
|
||
wxid = msg.get("sender")
|
||
if not wxid:
|
||
continue
|
||
sender_messages[wxid].append(msg)
|
||
|
||
candidate_wxids = [
|
||
wxid for wxid, items in sender_messages.items()
|
||
if len(items) >= self.daily_digest_min_messages
|
||
]
|
||
if not candidate_wxids:
|
||
return []
|
||
|
||
pending_wxids = []
|
||
for wxid in candidate_wxids:
|
||
if not force and self.digest_db.get_digest(chatroom_id, wxid, "daily", digest_date):
|
||
continue
|
||
pending_wxids.append(wxid)
|
||
if not pending_wxids:
|
||
self.LOG.info(
|
||
f"[成员交互摘要][群日批处理] 跳过: group={chatroom_id}, date={digest_date}, "
|
||
f"reason=all_candidates_already_built, candidates={len(candidate_wxids)}"
|
||
)
|
||
return []
|
||
|
||
member_labels = [f"{wxid} | {member_name_map.get(wxid, wxid)}" for wxid in pending_wxids]
|
||
compact_chat = self._format_group_messages_optimized(messages, member_name_map)
|
||
try:
|
||
compact_chat = compress_chat_data(compact_chat)
|
||
except Exception as e:
|
||
self.LOG.warning(f"[成员交互摘要] 压缩群日消息失败: group={chatroom_id}, date={digest_date}, error={e}")
|
||
|
||
parsed_members = self._request_group_daily_json(chatroom_id, digest_date, member_labels, compact_chat)
|
||
parsed_map = {item.get("wxid"): item for item in parsed_members if item.get("wxid")} if parsed_members else {}
|
||
|
||
digests = []
|
||
for wxid in pending_wxids:
|
||
parsed = parsed_map.get(wxid)
|
||
if not parsed:
|
||
self.LOG.warning(
|
||
f"[成员交互摘要][群日批处理] 跳过成员(未提取到有效结果): "
|
||
f"group={chatroom_id}, date={digest_date}, wxid={wxid}, "
|
||
f"source_count={len(sender_messages.get(wxid, []))}"
|
||
)
|
||
continue
|
||
parsed = self._normalize_profile_item(parsed)
|
||
digests.append({
|
||
"chatroom_id": chatroom_id,
|
||
"wxid": wxid,
|
||
"digest_type": "daily",
|
||
"period_key": digest_date,
|
||
"period_start": f"{digest_date} 00:00:00",
|
||
"period_end": f"{digest_date} 23:59:59",
|
||
"display_name": member_name_map.get(wxid, wxid),
|
||
"source_count": len(sender_messages.get(wxid, [])),
|
||
"summary_text": parsed.get("summary_text", ""),
|
||
"structured": parsed,
|
||
"meta": {
|
||
"source_type": "group_daily_messages",
|
||
"representative_messages": parsed.get("representative_messages", []),
|
||
},
|
||
})
|
||
self.LOG.info(
|
||
f"[成员交互摘要][群日批处理] 完成: group={chatroom_id}, date={digest_date}, "
|
||
f"candidates={len(candidate_wxids)}, pending={len(pending_wxids)}, built={len(digests)}"
|
||
)
|
||
return digests
|
||
|
||
def _build_period_digest(self, digest_type: str, chatroom_id: str, wxid: str, display_name: str,
|
||
period_key: str, period_start: str, period_end: str,
|
||
items: List[Dict]) -> Optional[Dict]:
|
||
parsed = self._request_period_json(
|
||
digest_type=digest_type,
|
||
chatroom_id=chatroom_id,
|
||
wxid=wxid,
|
||
display_name=display_name,
|
||
period_key=period_key,
|
||
items=items,
|
||
)
|
||
if not parsed:
|
||
self.LOG.warning(
|
||
f"[成员交互摘要][{digest_type}] 跳过周期摘要(未提取到有效结果): "
|
||
f"group={chatroom_id}, wxid={wxid}, period={period_key}, source_count={len(items)}, "
|
||
f"last_error={self.dify_client.last_error}"
|
||
)
|
||
return None
|
||
|
||
parsed = self._normalize_profile_item(parsed)
|
||
|
||
return {
|
||
"chatroom_id": chatroom_id,
|
||
"wxid": wxid,
|
||
"digest_type": digest_type,
|
||
"period_key": period_key,
|
||
"period_start": period_start,
|
||
"period_end": period_end,
|
||
"display_name": display_name,
|
||
"source_count": len(items),
|
||
"summary_text": parsed.get("summary_text", ""),
|
||
"structured": parsed,
|
||
"meta": {
|
||
"source_keys": [item.get("period_key") for item in items],
|
||
},
|
||
}
|
||
|
||
def _request_ai_json(self, prompt: str, tag: str, chatroom_id: str, wxid: str) -> Optional[Dict]:
|
||
if not self.dify_client.is_available():
|
||
return None
|
||
response = self.dify_client.run(
|
||
prompt=prompt,
|
||
user=f"member-digest:{chatroom_id}:{wxid}:{tag}",
|
||
inputs={"query": prompt, "chatroom_id": chatroom_id, "wxid": wxid, "tag": tag},
|
||
tag=tag,
|
||
)
|
||
if not response:
|
||
return None
|
||
parsed = self._parse_ai_answer(response.get("text", ""))
|
||
if parsed:
|
||
parsed["ai_usage"] = response.get("usage", {}) or {}
|
||
return parsed
|
||
|
||
def _request_period_json(self, digest_type: str, chatroom_id: str, wxid: str,
|
||
display_name: str, period_key: str, items: List[Dict]) -> Optional[Dict]:
|
||
if not self.dify_client.is_available():
|
||
return None
|
||
|
||
inputs = {
|
||
"digest_type": digest_type,
|
||
"chatroom_id": chatroom_id,
|
||
"wxid": wxid,
|
||
"display_name": display_name,
|
||
"period_key": period_key,
|
||
"source_items_json": json.dumps(self._build_period_source_items(items), ensure_ascii=False),
|
||
"source_item_count": str(len(items)),
|
||
}
|
||
response = self.dify_client.run(
|
||
prompt="",
|
||
user=f"member-digest:{chatroom_id}:{wxid}:{digest_type}:{period_key}",
|
||
inputs=inputs,
|
||
tag=f"{digest_type}:{period_key}",
|
||
)
|
||
if not response:
|
||
return None
|
||
parsed = self._parse_ai_answer(response.get("text", ""))
|
||
if parsed:
|
||
parsed["ai_usage"] = response.get("usage", {}) or {}
|
||
return parsed
|
||
|
||
def _request_group_daily_json(self, chatroom_id: str, digest_date: str,
|
||
member_labels: List[str], compressed_chat: str) -> List[Dict]:
|
||
if not self.dify_client.is_available():
|
||
return []
|
||
response = self.dify_client.run(
|
||
prompt="",
|
||
user=f"member-digest:{chatroom_id}:group-daily:{digest_date}",
|
||
inputs={
|
||
"digest_type": "daily",
|
||
"chatroom_id": chatroom_id,
|
||
"digest_date": digest_date,
|
||
"member_labels": "\n".join(member_labels),
|
||
"compressed_chat": compressed_chat,
|
||
},
|
||
tag=f"group-daily:{digest_date}",
|
||
)
|
||
if not response:
|
||
return []
|
||
parsed = self._parse_group_daily_answer(response.get("text", ""))
|
||
return parsed
|
||
|
||
@staticmethod
|
||
def _build_period_source_items(items: List[Dict]) -> List[Dict]:
|
||
source_items = []
|
||
for item in items:
|
||
structured = item.get("structured", {}) or {}
|
||
source_items.append({
|
||
"period_key": item.get("period_key"),
|
||
"summary_text": item.get("summary_text", ""),
|
||
"topics": structured.get("topics") or structured.get("stable_topics") or structured.get("long_term_topics") or [],
|
||
"discussion_scenarios": structured.get("discussion_scenarios") or structured.get("common_scenarios") or [],
|
||
"identity_clues": structured.get("identity_clues") or structured.get("identity_traits") or [],
|
||
"skill_signals": structured.get("skill_signals") or structured.get("skill_profile") or [],
|
||
"problem_solving_signals": structured.get("problem_solving_signals") or structured.get("problem_solving_profile") or [],
|
||
"family_signals": structured.get("family_signals") or structured.get("family_profile") or [],
|
||
"life_stage_signals": structured.get("life_stage_signals") or structured.get("life_stage_profile") or [],
|
||
"value_preferences": structured.get("value_preferences") or structured.get("value_profile") or [],
|
||
"habit_signals": structured.get("habit_signals") or structured.get("habit_patterns") or [],
|
||
"expression_markers": structured.get("expression_markers") or structured.get("expression_profile") or [],
|
||
"engagement_traits": structured.get("engagement_traits") or structured.get("stable_traits") or [],
|
||
"reply_entry_points": structured.get("reply_entry_points") or structured.get("reply_entry_profile") or [],
|
||
"reply_preferences": structured.get("reply_preferences") or structured.get("long_term_reply_preferences") or [],
|
||
"social_role": structured.get("social_role") or structured.get("group_role") or "",
|
||
"decision_style": structured.get("decision_style") or structured.get("decision_profile") or "",
|
||
"temperament_signal": structured.get("temperament_signal") or structured.get("temperament_tendency") or "",
|
||
"recent_state": structured.get("recent_state") or structured.get("phase_state") or [],
|
||
})
|
||
return source_items
|
||
|
||
def _parse_ai_answer(self, answer: str) -> Optional[Dict]:
|
||
if not answer:
|
||
return None
|
||
text = answer.strip()
|
||
match = re.search(r"\{.*\}", text, re.S)
|
||
if match:
|
||
text = match.group(0)
|
||
try:
|
||
data = json.loads(text)
|
||
except Exception:
|
||
return None
|
||
normalized = {}
|
||
for key, value in data.items():
|
||
if isinstance(value, list):
|
||
normalized[key] = [str(item).strip() for item in value if str(item).strip()]
|
||
elif isinstance(value, (int, float)):
|
||
normalized[key] = value
|
||
else:
|
||
normalized[key] = str(value).strip()
|
||
return normalized
|
||
|
||
def _parse_group_daily_answer(self, answer: str) -> List[Dict]:
|
||
if not answer:
|
||
return []
|
||
text = answer.strip()
|
||
match = re.search(r"\{.*\}", text, re.S)
|
||
if match:
|
||
text = match.group(0)
|
||
try:
|
||
parsed = json.loads(text)
|
||
except Exception:
|
||
return []
|
||
members = parsed.get("members", [])
|
||
if not isinstance(members, list):
|
||
return []
|
||
normalized_map = {}
|
||
for item in members:
|
||
if not isinstance(item, dict):
|
||
continue
|
||
normalized_item = self._normalize_profile_item(item)
|
||
wxid = normalized_item.get("wxid")
|
||
if not wxid:
|
||
continue
|
||
existing = normalized_map.get(wxid)
|
||
if not existing or self._score_profile_item(normalized_item) > self._score_profile_item(existing):
|
||
normalized_map[wxid] = normalized_item
|
||
return list(normalized_map.values())
|
||
|
||
@staticmethod
|
||
def _score_profile_item(item: Dict) -> float:
|
||
if not item:
|
||
return 0.0
|
||
score = 0.0
|
||
for key, value in item.items():
|
||
if key in {"wxid", "display_name"}:
|
||
continue
|
||
if isinstance(value, list):
|
||
score += len([v for v in value if str(v).strip()]) * 1.0
|
||
elif isinstance(value, (int, float)):
|
||
score += float(value)
|
||
elif str(value).strip():
|
||
score += 0.8
|
||
try:
|
||
score += float(item.get("confidence", 0)) * 2
|
||
except Exception:
|
||
pass
|
||
return score
|
||
|
||
def _format_group_messages_optimized(self, messages: List[Dict], member_name_map: Dict[str, str]) -> str:
|
||
if not messages:
|
||
return ""
|
||
time_groups = defaultdict(lambda: defaultdict(list))
|
||
for msg in messages:
|
||
timestamp = msg.get("timestamp")
|
||
sender = msg.get("sender", "")
|
||
content = str(msg.get("content", "")).strip()
|
||
if not sender or not content:
|
||
continue
|
||
try:
|
||
dt = datetime.strptime(str(timestamp), "%Y-%m-%d %H:%M:%S")
|
||
except Exception:
|
||
dt = None
|
||
if not dt:
|
||
continue
|
||
time_key = dt.strftime("%H:%M")
|
||
sender_name = member_name_map.get(sender, sender)
|
||
time_groups[time_key][sender_name].append(content)
|
||
|
||
lines = []
|
||
for time_key in sorted(time_groups.keys()):
|
||
lines.append(f"【{time_key}】")
|
||
for sender_name, contents in time_groups[time_key].items():
|
||
for idx, content in enumerate(contents):
|
||
if idx == 0:
|
||
lines.append(f"{sender_name}:{content}")
|
||
else:
|
||
lines.append(f" {content}")
|
||
return "\n".join(lines)
|
||
|
||
@staticmethod
|
||
def _week_period_bounds(date_value: str) -> Tuple[str, str, str]:
|
||
target_date = datetime.strptime(str(date_value)[:10], "%Y-%m-%d")
|
||
week_start = target_date - timedelta(days=target_date.weekday())
|
||
week_end = week_start + timedelta(days=6)
|
||
week_key = f"{week_start.strftime('%Y-%m-%d')}"
|
||
return week_key, week_start.strftime("%Y-%m-%d 00:00:00"), week_end.strftime("%Y-%m-%d 23:59:59")
|
||
|
||
@staticmethod
|
||
def _month_period_bounds(date_value: str) -> Tuple[str, str, str]:
|
||
target_dt = datetime.strptime(str(date_value)[:10], "%Y-%m-%d")
|
||
month_start = target_dt.replace(day=1)
|
||
if month_start.month == 12:
|
||
next_month = month_start.replace(year=month_start.year + 1, month=1, day=1)
|
||
else:
|
||
next_month = month_start.replace(month=month_start.month + 1, day=1)
|
||
month_end = next_month - timedelta(days=1)
|
||
month_key = month_start.strftime("%Y-%m")
|
||
return month_key, month_start.strftime("%Y-%m-%d 00:00:00"), month_end.strftime("%Y-%m-%d 23:59:59")
|