Files
abot/admin/dashboard/blueprints/robot.py
liuwei 14aa2ba067 增强群总结弹窗查看能力
- 后端群总结接口补充完整 summary_text 字段\n- 群运营详情页新增最近群总结全文弹窗\n- 保留原有摘要卡片展示并补充类型与生成时间
2026-05-06 11:56:15 +08:00

980 lines
42 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from flask import Blueprint, render_template, jsonify, request, current_app, redirect
from .auth import login_required
from loguru import logger
from utils.robot_cmd.robot_command import GroupBotManager, Feature, PermissionStatus
from datetime import datetime, timedelta
from db.base import BaseDBOperator
from plugins.ai_auto_response.memory.group_memory_profile import GroupMemoryService
# 创建群管理兼容蓝图(旧入口保留做跳转,实际功能已并入通讯录)
robot_bp = Blueprint('robot', __name__, url_prefix='/robot')
LOG = logger
def _build_group_ops_profile(server, group_id: str, group_name: str, peak_hours: list, plugin_stats: list) -> dict:
"""构建群运营 2.0 所需的群画像摘要。
设计说明:
1. 这里优先复用现有群画像快照与消息总结能力,不重新发明一套分析链路;
2. 若历史画像数据不足,也必须返回可展示的兜底结构,避免前端出现大片空白;
3. 输出结构专门面向后台运营视图,强调“群是什么群、最近聊什么、适合怎么运营”。
"""
profile_service = GroupMemoryService(server.db_manager, {})
profile = profile_service.build_group_memory_profile(group_id, group_name=group_name) or {}
style_profile = profile.get("style_profile", {}) or {}
focus_topics = [str(item).strip() for item in (profile.get("focus_topics") or []) if str(item).strip()]
structured_summary = profile.get("structured_summary", {}) or {}
recent_key_points = [
str(item).strip() for item in (structured_summary.get("recent_key_points") or []) if str(item).strip()
][:4]
unresolved_points = [
str(item).strip() for item in (structured_summary.get("unresolved_points") or []) if str(item).strip()
][:3]
stable_topics = [
str(item).strip() for item in (structured_summary.get("stable_topics") or []) if str(item).strip()
][:4]
peak_hour_labels = [str(item.get("label") or "").strip() for item in peak_hours if str(item.get("label") or "").strip()]
dominant_plugin_names = []
for item in plugin_stats[:3]:
plugin_name = str(item.get("plugin_name") or "").strip()
if plugin_name and plugin_name not in dominant_plugin_names:
dominant_plugin_names.append(plugin_name)
# 群定位摘要优先使用群画像服务的领域判断,再结合真实主题给出一段更像“运营结论”的描述。
inferred_domain = str(profile.get("inferred_domain") or "general").strip()
domain_label_map = {
"openclaw": "工作流 / 编排讨论型群",
"robotics": "机器人 / 自动化协作型群",
"dota": "Dota / 游戏讨论型群",
"tech": "技术排障 / 工具协作型群",
"casual": "熟人闲聊 / 日常水群型群",
"general": "综合交流型群",
}
group_identity = domain_label_map.get(inferred_domain, "综合交流型群")
profile_tags = [
tag for tag in [
group_identity,
str(style_profile.get("interaction_tone") or "").strip(),
str(style_profile.get("humor_style") or "").strip(),
str(style_profile.get("expressiveness_style") or "").strip(),
] if tag
]
summary_lines = []
if focus_topics:
summary_lines.append(f"近期高频主题集中在:{''.join(focus_topics[:4])}")
if peak_hour_labels:
summary_lines.append(f"活跃时段主要落在:{''.join(peak_hour_labels[:2])}")
if dominant_plugin_names:
summary_lines.append(f"当前最容易被接受的机器人能力偏向:{''.join(dominant_plugin_names[:2])}")
if not summary_lines:
summary_lines.append("当前历史数据仍偏少,建议先继续积累群消息、总结与插件使用记录。")
return {
"group_identity": group_identity,
"profile_tags": profile_tags,
"focus_topics": focus_topics[:6],
"stable_topics": stable_topics,
"recent_key_points": recent_key_points,
"unresolved_points": unresolved_points,
"summary_text": "".join(summary_lines),
"interaction_tone": str(style_profile.get("interaction_tone") or "").strip(),
"humor_style": str(style_profile.get("humor_style") or "").strip(),
"expressiveness_style": str(style_profile.get("expressiveness_style") or "").strip(),
"peak_hours": peak_hour_labels[:3],
"dominant_plugins": dominant_plugin_names[:3],
"cache_status": str(profile.get("cache_status") or "").strip(),
"last_generated_at": str(profile.get("last_generated_at") or "").strip(),
}
def _load_recent_group_summaries(server, group_id: str, limit: int = 3) -> list:
"""读取最近几条群总结,供运营面板直接展示。
设计说明:
1. 群画像适合给“结论”,但运营者往往还想看到最近几期总结到底说了什么;
2. 默认卡片仍使用短摘要,避免详情页被大段文本挤占;
3. 同时补充完整总结正文,供前端弹窗按需展开查看;
4. 若没有总结记录,返回空列表,由前端优雅降级。
"""
summary_db = getattr(server, "message_summary_db", None)
if summary_db is None:
return []
rows = summary_db.execute_query(
"""
SELECT period_key, summary_type, source_message_count, summary_text, last_generated_at
FROM t_message_summary
WHERE chatroom_id = %s
ORDER BY period_end DESC, update_time DESC
LIMIT %s
""",
(group_id, int(limit)),
) or []
result = []
for row in rows:
raw_summary = str(row.get("summary_text") or "").strip()
normalized_summary = " ".join(raw_summary.split())
result.append({
"period_key": str(row.get("period_key") or "").strip(),
"summary_type": str(row.get("summary_type") or "").strip(),
"source_message_count": int(row.get("source_message_count") or 0),
"last_generated_at": str(row.get("last_generated_at") or "").strip(),
# 详情弹窗需要查看完整总结正文;这里保留原文,不做截断。
"summary_text": raw_summary,
# 卡片区域继续展示压缩后的摘要,避免详情首屏信息密度过高。
"summary_excerpt": normalized_summary[:180] + ("..." if len(normalized_summary) > 180 else ""),
})
return result
def _load_value_rank_top_members(server, group_id: str, limit: int = 5) -> list:
"""读取群内最近一期高价值成员快照。
说明:
1. 该榜单直接复用已落库的身价快照,而不是运行时重新算分;
2. 若群里尚未开启或尚未产出这类数据,则静默返回空列表;
3. 返回结构会补齐成员展示名,避免前端出现一串 wxid。
"""
db = BaseDBOperator(server.db_manager)
latest_row = db.execute_query(
"""
SELECT MAX(stat_date) AS latest_stat_date
FROM t_value_rank_snapshot
WHERE group_id = %s
""",
(group_id,),
fetch_one=True,
) or {}
latest_stat_date = latest_row.get("latest_stat_date")
if not latest_stat_date:
return []
rows = db.execute_query(
"""
SELECT user_id, score, rank_no, title, points_total, msg_count_7d, active_days_30, inactive_days
FROM t_value_rank_snapshot
WHERE group_id = %s AND stat_date = %s
ORDER BY rank_no ASC
LIMIT %s
""",
(group_id, latest_stat_date, int(limit)),
) or []
result = []
for row in rows:
user_id = str(row.get("user_id") or "").strip()
result.append({
"wxid": user_id,
"display_name": server.contact_manager.get_group_name(group_id, user_id) or user_id,
"score": float(row.get("score") or 0.0),
"rank_no": int(row.get("rank_no") or 0),
"title": str(row.get("title") or "").strip(),
"points_total": int(row.get("points_total") or 0),
"msg_count_7d": int(row.get("msg_count_7d") or 0),
"active_days_30": int(row.get("active_days_30") or 0),
"inactive_days": int(row.get("inactive_days") or 0),
})
return result
def _load_activation_candidates(server, group_id: str, limit: int = 6) -> list:
"""识别“待激活成员”。
口径选择:
1. 不直接拿最沉默的人,因为那类成员很多已经处于长期静默,激活收益未必最高;
2. 这里优先找“7~30 天未发言,但仍不是完全流失”的边缘成员,更适合做召回动作;
3. 这类成员通常对低门槛互动、固定时段提醒更敏感,运营价值更高。
"""
db = BaseDBOperator(server.db_manager)
rows = db.execute_query(
"""
SELECT
wxid,
COALESCE(NULLIF(display_name, ''), nick_name, wxid) AS display_name,
latest_active_time,
TIMESTAMPDIFF(DAY, latest_active_time, NOW()) AS inactivity_days
FROM t_chatroom_member
WHERE chatroom_id = %s
AND status = 1
AND latest_active_time IS NOT NULL
AND latest_active_time < DATE_SUB(NOW(), INTERVAL 7 DAY)
AND latest_active_time >= DATE_SUB(NOW(), INTERVAL 30 DAY)
ORDER BY latest_active_time DESC
LIMIT %s
""",
(group_id, int(limit)),
) or []
result = []
for row in rows:
result.append({
"wxid": str(row.get("wxid") or "").strip(),
"display_name": str(row.get("display_name") or "").strip(),
"latest_active_time": str(row.get("latest_active_time") or "").strip(),
"inactivity_days": int(row.get("inactivity_days") or 0),
})
return result
def _build_ops_member_segments(server, group_id: str, speaker_ranking: list, inactive_members: list) -> dict:
"""构建群成员分层结果。
设计目标:
1. 保留原有“沉默成员 / 发言排行”表格;
2. 在此基础上额外产出“核心成员 / 高价值成员 / 待激活成员”三类运营视角;
3. 前端展示时以卡片为主,让运营者更快找到“该看谁”。
"""
value_rank_members = _load_value_rank_top_members(server, group_id, limit=5)
activation_candidates = _load_activation_candidates(server, group_id, limit=6)
member_context_map = {
str(item.get("wxid") or "").strip(): item
for item in (server.member_context_db.list_group_member_contexts(group_id) or [])
}
core_members = []
for item in speaker_ranking[:5]:
wxid = str(item.get("sender") or "").strip()
context = member_context_map.get(wxid, {}) or {}
core_members.append({
"wxid": wxid,
"display_name": str(item.get("display_name") or wxid).strip(),
"message_count": int(item.get("message_count") or 0),
"last_message_time": str(item.get("last_message_time") or "").strip(),
"activity_level": str(context.get("activity_level") or "").strip(),
"summary_text": str(context.get("summary_text") or "").strip(),
})
dormant_members = []
for item in inactive_members[:5]:
wxid = str(item.get("wxid") or "").strip()
context = member_context_map.get(wxid, {}) or {}
dormant_members.append({
"wxid": wxid,
"display_name": str(item.get("display_name") or wxid).strip(),
"inactivity_days": int(item.get("inactivity_days") or 0),
"latest_active_time": str(item.get("latest_active_time") or "").strip(),
"response_style_hint": str(context.get("response_style_hint") or "").strip(),
})
return {
"core_members": core_members,
"value_rank_members": value_rank_members,
"activation_candidates": activation_candidates,
"dormant_members": dormant_members,
}
def _build_ops_action_cards(group_name: str, ops_profile: dict, ops_members: dict, plugin_stats: list) -> list:
"""生成更偏“可执行动作”的运营建议卡片。
与旧版 `operation_suggestions` 的区别:
1. 旧版更像诊断提示;新版更强调“下一步怎么做”;
2. 仍然采用规则组合,避免引入高成本、不可控的 LLM 依赖;
3. 结构尽量稳定,方便前端做卡片展示与后续继续增强。
"""
action_cards = []
peak_hours = ops_profile.get("peak_hours") or []
focus_topics = ops_profile.get("focus_topics") or []
dominant_plugins = ops_profile.get("dominant_plugins") or []
activation_candidates = ops_members.get("activation_candidates") or []
core_members = ops_members.get("core_members") or []
if peak_hours:
action_cards.append({
"type": "time",
"title": "建议把互动动作放到高峰时段",
"summary": f"优先考虑在 {peak_hours[0]} 发起签到、话题或菜单引导。",
"detail": f"{group_name} 最近高峰主要集中在 {''.join(peak_hours[:2])},在这个时间段发起动作更容易拿到首轮反馈。",
})
if dominant_plugins:
action_cards.append({
"type": "feature",
"title": "建议优先强化已被接受的功能入口",
"summary": f"先围绕 {dominant_plugins[0]} 做菜单、公告或欢迎语曝光。",
"detail": f"当前群里真实被用起来的功能更偏向 {''.join(dominant_plugins[:3])},说明这类能力更容易被接受,适合作为下一阶段主推入口。",
})
if activation_candidates:
candidate_names = [str(item.get("display_name") or "").strip() for item in activation_candidates[:3] if str(item.get("display_name") or "").strip()]
action_cards.append({
"type": "activation",
"title": "建议优先激活边缘活跃成员",
"summary": f"可以先针对 {''.join(candidate_names) if candidate_names else '近 7~30 天边缘成员'} 做轻量召回。",
"detail": "这批成员不是长期沉默用户,仍有回流可能,更适合通过低门槛互动、固定提醒或轻福利重新拉回讨论。",
})
if focus_topics:
action_cards.append({
"type": "topic",
"title": "建议围绕当前高频主题设计话题入口",
"summary": f"优先从 {focus_topics[0]}{focus_topics[1] if len(focus_topics) > 1 else focus_topics[0]} 这类主题切入。",
"detail": "与其硬推通用活动,不如顺着群里原本就在讨论的内容做问题、投票、接龙或功能引导,转化更自然。",
})
if core_members:
core_names = [str(item.get("display_name") or "").strip() for item in core_members[:3] if str(item.get("display_name") or "").strip()]
action_cards.append({
"type": "core",
"title": "建议借核心成员带动扩散",
"summary": f"可以优先让 {''.join(core_names) if core_names else '核心活跃成员'} 参与首轮互动。",
"detail": "核心成员往往能决定首轮响应速度和讨论氛围,先让他们接住话题,比单纯增加曝光更有效。",
})
if not action_cards:
action_cards.append({
"type": "fallback",
"title": "建议先继续积累群画像与使用数据",
"summary": "当前数据不足以给出更细的动作建议。",
"detail": "可以先保持基础运营动作,等群总结、成员画像和插件使用数据更完整后,再做更细的策略推荐。",
})
return action_cards[:4]
def _build_group_list_ops_preview(server, group_id: str, group_name: str) -> dict:
"""构建群列表层可展示的轻量运营速览。
设计原则:
1. 列表页只放“筛选信号”,不放完整成员明细,避免信息密度过高;
2. 详情页里已经有完整诊断,因此这里优先输出健康度、核心成员、待激活成员等摘要;
3. 为了控制接口成本,这里不复用重型画像构建,只做轻量聚合和短摘要生成。
"""
now = datetime.now()
last_30d = now - timedelta(days=30)
member_summary = server.contact_db.get_group_member_summary(group_id, inactive_days=30)
message_stats_30d = server.message_storage.get_message_stats_by_date_range(group_id, last_30d, now)
plugin_summary = server.stats_db.get_group_plugin_summary(group_id, days=30)
speaker_ranking = server.message_storage.get_group_member_message_ranking(group_id, last_30d, now, limit=3)
activation_candidates = _load_activation_candidates(server, group_id, limit=3)
recent_summaries = _load_recent_group_summaries(server, group_id, limit=1)
total_members = member_summary.get("in_group_members") or member_summary.get("total_members") or 0
active_30d = int(member_summary.get("active_30d_members") or 0)
zombie_count = int(member_summary.get("inactive_members") or 0)
msg_30d_total = int(message_stats_30d.get("total_count") or 0)
plugin_total_calls = int(plugin_summary.get("total_calls") or 0)
# 列表页也复用和详情页一致的健康度口径:
# 1. 这样用户从列表点进详情时,数值语义不会跳变;
# 2. 逻辑保持一套,后续调分只改一处思路更稳;
# 3. 列表上只展示分值,不额外塞入复杂推导过程。
health_score = 100
if total_members > 0:
health_score -= min(35, round((zombie_count / total_members) * 35))
health_score -= min(25, round((1 - min(active_30d / total_members, 1)) * 25))
if msg_30d_total == 0:
health_score -= 25
elif msg_30d_total < 50:
health_score -= 12
if plugin_total_calls == 0:
health_score -= 10
health_score = max(0, min(100, health_score))
core_member_names = []
for item in speaker_ranking:
sender = str(item.get("sender") or "").strip()
display_name = server.contact_manager.get_group_name(group_id, sender) or sender
if display_name:
core_member_names.append(display_name)
activation_names = [
str(item.get("display_name") or "").strip()
for item in activation_candidates
if str(item.get("display_name") or "").strip()
]
summary_excerpt = ""
if recent_summaries:
summary_excerpt = str(recent_summaries[0].get("summary_excerpt") or "").strip()
if not summary_excerpt:
if core_member_names:
summary_excerpt = f"核心讨论主要由 {''.join(core_member_names[:2])} 带动。"
elif plugin_total_calls > 0:
summary_excerpt = f"近30天已有 {plugin_total_calls} 次插件调用,可继续做功能渗透。"
else:
summary_excerpt = "当前运营数据较少,建议先观察活跃与功能渗透。"
return {
"group_id": group_id,
"group_name": group_name,
"health_score": int(health_score),
"active_member_count_30d": active_30d,
"zombie_member_count": zombie_count,
"core_member_names": core_member_names[:3],
"activation_candidate_count": len(activation_names),
"activation_candidate_names": activation_names[:3],
"plugin_call_count_30d": plugin_total_calls,
"summary_excerpt": summary_excerpt[:120] + ("..." if len(summary_excerpt) > 120 else ""),
}
# 旧群权限页入口兼容跳转
@robot_bp.route('/')
@login_required
def robot_management():
return redirect('/contacts')
# API路由
@robot_bp.route('/api/groups')
@login_required
def api_robot_groups():
try:
server = current_app.dashboard_server
# 获取所有群组列表
groups = GroupBotManager.get_group_list()
# 如果方法返回None或发生异常使用本地缓存
if groups is None and hasattr(GroupBotManager, "local_cache"):
groups = GroupBotManager.local_cache.get("group_list", set())
# 如果仍然为None则初始化为空集合
if groups is None:
groups = set()
LOG.info(f"获取到 {len(groups)} 个群组")
group_data = []
for group_id in groups:
try:
# 获取群名称,如果失败则使用默认值
group_name = server.contact_manager.get_nickname(group_id)
if not group_name:
group_name = f"未知群组({group_id})"
# 获取机器人状态,如果失败则使用默认值
try:
robot_status = GroupBotManager.get_group_permission(group_id, Feature.ROBOT)
except:
robot_status = PermissionStatus.DISABLED
group_data.append({
"group_id": group_id,
"group_name": group_name,
"robot_status": robot_status.value if robot_status else "unknown"
})
except Exception as e:
LOG.warning(f"处理群组 {group_id} 信息时出错: {e}")
# 添加基本信息,避免单个群组错误影响整个列表
group_data.append({
"group_id": group_id,
"group_name": "获取失败",
"robot_status": "unknown"
})
return jsonify({"success": True, "data": group_data})
except Exception as e:
LOG.error(f"获取群组列表失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@robot_bp.route('/api/group/<group_id>/permissions')
@login_required
def api_robot_group_permissions(group_id):
try:
permissions = GroupBotManager.list_group_permissions(group_id)
permission_data = []
for feature, status in permissions.items():
permission_data.append({
"feature_id": feature.value,
"feature_name": feature.name,
"feature_description": feature.description,
"status": status.value
})
return jsonify({"success": True, "data": permission_data})
except Exception as e:
LOG.error(f"获取群组权限失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@robot_bp.route('/api/group/<group_id>/permissions', methods=['POST'])
@login_required
def api_update_robot_permissions(group_id):
# 更新群组功能权限
server = current_app.dashboard_server
data = request.json
feature_id = data.get('feature_id')
status = data.get('status')
try:
feature = Feature(int(feature_id))
new_status = PermissionStatus(status)
# 特殊处理ROBOT功能
if feature == Feature.ROBOT:
r = server.db_manager.get_redis_connection()
if new_status == PermissionStatus.ENABLED:
GroupBotManager.local_cache["group_list"].add(group_id)
r.sadd("group:list", group_id)
else:
GroupBotManager.local_cache["group_list"].remove(group_id)
r.srem("group:list", group_id)
GroupBotManager.set_group_permission(group_id, feature, new_status)
return jsonify({"success": True})
except Exception as e:
LOG.error(f"更新群组权限失败: {e}")
return jsonify({"success": False, "error": str(e)}), 400
@robot_bp.route('/api/batch_operation', methods=['POST'])
@login_required
def api_robot_batch_operation():
# 批量操作接口
data = request.json
operation = data.get('operation')
group_ids = data.get('group_ids', [])
results = {}
try:
if operation == 'remove_groups':
for group_id in group_ids:
result = GroupBotManager.remove_group(group_id)
results[group_id] = result
return jsonify({"success": True, "results": results})
else:
return jsonify({"success": False, "error": "不支持的操作类型"}), 400
except Exception as e:
LOG.error(f"批量操作失败: {e}")
return jsonify({"success": False, "error": str(e)}), 400
@robot_bp.route('/api/add_group', methods=['POST'])
@login_required
def api_add_group():
try:
server = current_app.dashboard_server
data = request.json
group_id = data.get('group_id')
if not group_id or not group_id.strip():
return jsonify({"success": False, "error": "群组ID不能为空"}), 400
group_id = group_id.strip()
# 如果group_id 不是@chatroom 结尾,这提示错误
if not group_id.endswith("@chatroom"):
return jsonify({"success": False, "error": "群组ID必须以 @chatroom 结尾"}), 400
# 检查群组是否已存在
if group_id in GroupBotManager.local_cache["group_list"]:
return jsonify({"success": False, "error": "该群组已存在"}), 400
# 添加群组到列表并启用机器人功能
GroupBotManager.local_cache["group_list"].add(group_id)
r = server.db_manager.get_redis_connection()
r.sadd("group:list", group_id)
# 设置ROBOT功能为启用状态
GroupBotManager.set_group_permission(group_id, Feature.ROBOT, PermissionStatus.ENABLED)
# 获取群组名称(如果可能)
group_name = server.contact_manager.get_nickname(group_id)
return jsonify({
"success": True,
"message": f"群组 {group_id} 已成功添加",
"group": {
"group_id": group_id,
"group_name": group_name,
"robot_status": "enabled"
}
})
except Exception as e:
LOG.error(f"添加群组失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@robot_bp.route('/api/group/<group_id>/message_trend')
@login_required
def api_group_message_trend(group_id):
try:
server = current_app.dashboard_server
days = request.args.get('days', 7, type=int)
trend_data = server.message_storage.get_message_trend(group_id, days)
# 格式化数据为前端需要的格式
dates = []
counts = []
for item in trend_data:
# 将日期转换为字符串
if isinstance(item['date'], datetime):
date_str = item['date'].strftime('%Y-%m-%d')
else:
date_str = str(item['date'])
dates.append(date_str)
counts.append(item['message_count'])
return jsonify({
'success': True,
'data': {
'dates': dates,
'counts': counts
}
})
except Exception as e:
LOG.error(f"获取群组消息趋势数据出错: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@robot_bp.route('/api/groups/ops_preview', methods=['POST'])
@login_required
def api_groups_ops_preview():
"""批量获取群列表层的运营速览摘要。"""
try:
server = current_app.dashboard_server
data = request.json or {}
group_ids = data.get("group_ids") or []
# 列表批量接口只接受有限数量群组:
# 1. 避免一次把全量群的详情聚合全算一遍;
# 2. 通讯录列表本身就有分页/筛选,首版以“当前批次可见群”为主;
# 3. 这里做硬上限,防止误调用把后台拖慢。
cleaned_group_ids = []
for item in group_ids:
group_id = str(item or "").strip()
if group_id and group_id not in cleaned_group_ids:
cleaned_group_ids.append(group_id)
cleaned_group_ids = cleaned_group_ids[:30]
preview_map = {}
for group_id in cleaned_group_ids:
group_name = server.contact_manager.get_nickname(group_id) or group_id
preview_map[group_id] = _build_group_list_ops_preview(server, group_id, group_name)
return jsonify({
"success": True,
"data": {
"preview_map": preview_map,
}
})
except Exception as e:
LOG.error(f"批量获取群运营速览失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@robot_bp.route('/api/group/<group_id>/detail')
@login_required
def api_group_detail(group_id):
"""群组详情聚合接口"""
try:
server = current_app.dashboard_server
now = datetime.now()
last_24h = now - timedelta(days=1)
last_7d = now - timedelta(days=7)
last_30d = now - timedelta(days=30)
group_name = server.contact_manager.get_nickname(group_id) or group_id
permission_map = GroupBotManager.list_group_permissions(group_id)
enabled_features = [
feature.description
for feature, status in permission_map.items()
if status == PermissionStatus.ENABLED
]
member_summary = server.contact_db.get_group_member_summary(group_id, inactive_days=30)
inactive_members = server.contact_db.get_inactive_members_rank(group_id, days=30, limit=8)
message_stats_24h = server.message_storage.get_message_stats_by_date_range(group_id, last_24h, now)
message_stats_7d = server.message_storage.get_message_stats_by_date_range(group_id, last_7d, now)
message_stats_30d = server.message_storage.get_message_stats_by_date_range(group_id, last_30d, now)
speaker_ranking = server.message_storage.get_group_member_message_ranking(group_id, last_30d, now, limit=10)
plugin_summary = server.stats_db.get_group_plugin_summary(group_id, days=30)
plugin_stats = server.stats_db.get_group_plugin_stats(group_id, days=30, limit=8)
last_message = server.message_storage.get_group_last_message(group_id)
trend_data = server.message_storage.get_message_trend(group_id, 14)
hourly_distribution = server.message_storage.get_group_hourly_distribution(group_id, 30)
for item in inactive_members:
wxid = item.get("wxid")
item["display_name"] = server.contact_manager.get_group_name(group_id, wxid) or item.get("nick_name") or wxid
for item in speaker_ranking:
sender = item.get("sender")
item["display_name"] = server.contact_manager.get_group_name(group_id, sender) or sender
total_members = member_summary.get("in_group_members") or member_summary.get("total_members") or 0
active_30d = member_summary.get("active_30d_members", 0)
zombie_count = member_summary.get("inactive_members", 0)
msg_30d_total = message_stats_30d.get("total_count", 0)
plugin_total_calls = plugin_summary.get("total_calls", 0)
health_score = 100
if total_members > 0:
health_score -= min(35, round((zombie_count / total_members) * 35))
health_score -= min(25, round((1 - min(active_30d / total_members, 1)) * 25))
if msg_30d_total == 0:
health_score -= 25
elif msg_30d_total < 50:
health_score -= 12
if plugin_total_calls == 0:
health_score -= 10
health_score = max(0, min(100, health_score))
trend_dates = []
trend_counts = []
for item in trend_data:
date_val = item.get("date")
if isinstance(date_val, datetime):
date_str = date_val.strftime('%Y-%m-%d')
else:
date_str = str(date_val)
trend_dates.append(date_str)
trend_counts.append(int(item.get("message_count") or 0))
hourly_distribution_sorted = sorted(
hourly_distribution,
key=lambda x: x.get("message_count", 0),
reverse=True
)
peak_hours = [
{
"hour": item.get("hour", 0),
"label": f"{int(item.get('hour', 0)):02d}:00-{(int(item.get('hour', 0)) + 1) % 24:02d}:00",
"message_count": item.get("message_count", 0),
}
for item in hourly_distribution_sorted[:3]
if item.get("message_count", 0) > 0
]
plugin_success_rate = (
round(plugin_summary.get("success_calls", 0) / plugin_total_calls * 100, 1)
if plugin_total_calls else 0
)
zombie_ratio = round((zombie_count / total_members) * 100, 1) if total_members else 0
active_ratio = round((active_30d / total_members) * 100, 1) if total_members else 0
operation_suggestions = []
if zombie_ratio >= 50:
operation_suggestions.append({
"level": "danger",
"title": "僵尸成员偏多,建议分层清理或召回",
"desc": f"当前僵尸成员占比 {zombie_ratio}% ,建议优先筛查 30 天未发言成员,结合群定位决定清理、激活或转移。"
})
elif zombie_ratio >= 30:
operation_suggestions.append({
"level": "warning",
"title": "沉默成员较多,建议做激活动作",
"desc": f"当前僵尸成员占比 {zombie_ratio}% ,可以通过签到、抽奖、任务或话题互动做一次激活。"
})
if active_ratio < 20:
operation_suggestions.append({
"level": "danger",
"title": "活跃覆盖率偏低,群内容可能只被少数人消费",
"desc": f"近30天活跃覆盖率仅 {active_ratio}% ,建议增加低门槛互动,如投票、签到、接龙、关键词触发。"
})
elif active_ratio < 40:
operation_suggestions.append({
"level": "warning",
"title": "活跃度中等,可继续扩大参与面",
"desc": f"近30天活跃覆盖率为 {active_ratio}% ,建议在高峰时段安排固定互动,提高更多成员参与率。"
})
if plugin_total_calls == 0:
operation_suggestions.append({
"level": "info",
"title": "插件渗透率为 0建议补一次功能引导",
"desc": "这个群近30天没有插件调用建议置顶菜单、欢迎语或群公告里给出可用指令示例。"
})
elif plugin_summary.get("plugin_count", 0) <= 2:
operation_suggestions.append({
"level": "info",
"title": "插件使用面偏窄,可以扩展场景",
"desc": f"近30天仅使用了 {plugin_summary.get('plugin_count', 0)} 个插件,建议根据群定位补充娱乐、工具或运营型插件。"
})
if plugin_total_calls > 0 and plugin_success_rate < 85:
operation_suggestions.append({
"level": "warning",
"title": "插件成功率偏低,建议检查指令体验",
"desc": f"近30天插件成功率 {plugin_success_rate}% ,可排查失败插件、异常命令和提示文案是否清晰。"
})
if msg_30d_total < 30:
operation_suggestions.append({
"level": "info",
"title": "群消息基数偏低,建议降低互动门槛",
"desc": f"近30天消息量仅 {msg_30d_total} 条,更适合短反馈、轻任务、固定时间提醒等低成本互动。"
})
if not operation_suggestions:
operation_suggestions.append({
"level": "success",
"title": "群整体状态健康,可持续做精细化运营",
"desc": "当前活跃、消息量、插件调用都比较均衡,建议继续围绕高峰时段和高频插件做强化。"
})
# 群运营分析 2.0 采用“增量增强”策略:
# 1. 现有 overview / diagnosis / operation_suggestions 等结构继续保留,避免影响旧页面;
# 2. 新增 ops_profile / ops_members / ops_actions 三类字段,供现有详情页逐步增强使用;
# 3. 即使某些增强数据暂时为空,也不应影响老内容展示。
ops_profile = _build_group_ops_profile(
server=server,
group_id=group_id,
group_name=group_name,
peak_hours=peak_hours,
plugin_stats=plugin_stats,
)
ops_members = _build_ops_member_segments(
server=server,
group_id=group_id,
speaker_ranking=speaker_ranking,
inactive_members=inactive_members,
)
recent_summaries = _load_recent_group_summaries(server, group_id, limit=3)
ops_actions = _build_ops_action_cards(
group_name=group_name,
ops_profile=ops_profile,
ops_members=ops_members,
plugin_stats=plugin_stats,
)
return jsonify({
"success": True,
"data": {
"group_id": group_id,
"group_name": group_name,
"health_score": health_score,
"overview": {
"member_count": total_members,
"active_member_count_7d": member_summary.get("active_7d_members", 0),
"active_member_count_30d": active_30d,
"zombie_member_count": zombie_count,
"never_spoken_count": member_summary.get("never_spoken_members", 0),
"message_count_24h": message_stats_24h.get("total_count", 0),
"message_count_7d": message_stats_7d.get("total_count", 0),
"message_count_30d": msg_30d_total,
"plugin_call_count_30d": plugin_total_calls,
"plugin_count_30d": plugin_summary.get("plugin_count", 0),
},
"diagnosis": [
{
"title": "活跃覆盖率",
"value": f"{round((active_30d / total_members) * 100, 1) if total_members else 0}%",
"desc": "近30天发言成员占全群人数比例"
},
{
"title": "僵尸用户占比",
"value": f"{round((zombie_count / total_members) * 100, 1) if total_members else 0}%",
"desc": "30天未发言或从未发言成员占比"
},
{
"title": "人均消息量",
"value": round(msg_30d_total / active_30d, 1) if active_30d else 0,
"desc": "近30天活跃成员的人均发言量"
},
{
"title": "插件渗透度",
"value": f"{plugin_summary.get('plugin_count', 0)} 个插件",
"desc": "近30天在本群真实发生调用的插件数"
}
],
"permissions": {
"enabled_count": len(enabled_features),
"enabled_features": enabled_features,
},
"member_summary": member_summary,
"message_summary": {
"last_message": last_message,
"last_24h": message_stats_24h,
"last_7d": message_stats_7d,
"last_30d": message_stats_30d,
"type_mix_30d": [
{"label": "文本", "count": message_stats_30d.get("text_count", 0)},
{"label": "图片", "count": message_stats_30d.get("image_count", 0)},
{"label": "视频", "count": message_stats_30d.get("video_count", 0)},
{"label": "链接", "count": message_stats_30d.get("link_count", 0)},
{"label": "表情", "count": message_stats_30d.get("emoji_count", 0)},
],
"trend_14d": {
"dates": trend_dates,
"counts": trend_counts
},
"hourly_distribution_30d": hourly_distribution,
"peak_hours_30d": peak_hours,
},
"inactive_members": inactive_members,
"speaker_ranking": speaker_ranking,
"plugin_summary": plugin_summary,
"plugin_stats": plugin_stats,
"operation_suggestions": operation_suggestions,
"ops_profile": ops_profile,
"ops_members": ops_members,
"ops_actions": ops_actions,
"recent_summaries": recent_summaries,
}
})
except Exception as e:
LOG.error(f"获取群组详情失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500
# 添加缺失的群组状态更新接口
@robot_bp.route('/api/group/<group_id>/status', methods=['POST'])
@login_required
def api_update_group_status(group_id):
try:
server = current_app.dashboard_server
data = request.json
status = data.get('status')
if status == 'disabled':
# 禁用该群组的所有功能
LOG.info(f"正在禁用群组 {group_id} 的所有功能")
# 获取所有功能并禁用
for feature in Feature:
GroupBotManager.set_group_permission(group_id, feature, PermissionStatus.DISABLED)
# 特殊处理ROBOT功能从群组列表中移除
if group_id in GroupBotManager.local_cache["group_list"]:
GroupBotManager.local_cache["group_list"].remove(group_id)
# 从Redis中移除
r = server.db_manager.get_redis_connection()
r.srem("group:list", group_id)
return jsonify({
"success": True,
"message": f"群组 {group_id} 的所有功能已禁用"
})
elif status == 'enabled':
# 启用该群组的基本功能
LOG.info(f"正在启用群组 {group_id} 的基本功能")
# 添加到群组列表
if group_id not in GroupBotManager.local_cache["group_list"]:
GroupBotManager.local_cache["group_list"].add(group_id)
# 添加到Redis
r = server.db_manager.get_redis_connection()
r.sadd("group:list", group_id)
# 启用ROBOT基本功能
GroupBotManager.set_group_permission(group_id, Feature.ROBOT, PermissionStatus.ENABLED)
return jsonify({
"success": True,
"message": f"群组 {group_id} 的基本功能已启用"
})
else:
return jsonify({
"success": False,
"error": "不支持的状态值,只接受 'enabled''disabled'"
}), 400
except Exception as e:
LOG.error(f"更新群组状态失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500