Files
abot/db/stats_db.py
liuwei d9ce3ce33e 修复仪表盘摘要接口Decimal类型计算报错
变更项:

1. 修复 /api/dashboard_summary 中 success_rate 为 Decimal 与 float 混算导致的异常。

2. 将 success_rate 与 avg_response_time 统一转换为 float,避免后续健康分计算类型冲突。

3. 恢复首页热门信息接口返回,避免因接口失败导致榜单空白。
2026-04-15 17:28:55 +08:00

786 lines
30 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Union, Tuple
from db.base import BaseDBOperator
from db.connection import DBConnectionManager
class StatsDBOperator(BaseDBOperator):
"""统计数据库操作类"""
def __init__(self, db_manager: DBConnectionManager):
super().__init__(db_manager)
self._group_user_tracker_ready = False
def _ensure_group_user_tracker_table(self) -> bool:
"""确保群维度唯一用户追踪表存在"""
if self._group_user_tracker_ready:
return True
sql = """
CREATE TABLE IF NOT EXISTS t_group_command_user_stats (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
group_id VARCHAR(50) NOT NULL COMMENT '群组ID',
plugin_name VARCHAR(50) NOT NULL COMMENT '插件名称',
command VARCHAR(50) NOT NULL COMMENT '触发命令',
user_id VARCHAR(50) NOT NULL COMMENT '用户ID',
first_used_at DATETIME NOT NULL COMMENT '首次触发时间',
last_used_at DATETIME NOT NULL COMMENT '最近触发时间',
UNIQUE KEY uk_group_plugin_command_user (group_id, plugin_name, command, user_id),
INDEX idx_group_plugin_command (group_id, plugin_name, command),
INDEX idx_last_used_at (last_used_at)
) COMMENT='群命令用户去重追踪表'
"""
ok = self.execute_update(sql)
if ok:
self._group_user_tracker_ready = True
return ok
def record_plugin_call(self, plugin_name: str, command: str, user_id: str,
group_id: Optional[str], success: bool,
process_time_ms: float) -> bool:
"""记录插件调用信息
Args:
plugin_name: 插件名称
command: 触发的命令
user_id: 用户ID
group_id: 群组ID私聊为None
success: 是否调用成功
process_time_ms: 处理时间(毫秒)
Returns:
是否成功记录
"""
# 1. 更新插件统计汇总表
self._update_plugin_stats(plugin_name, command, success, process_time_ms,
True if group_id else False)
# 2. 更新用户使用统计表
self._update_user_stats(user_id, plugin_name, command, success)
# 3. 如果是群聊,更新群组使用统计表
if group_id:
self._update_group_stats(group_id, plugin_name, command, user_id, success)
return True
def record_error(self, plugin_name: str, command: str, user_id: str,
group_id: Optional[str], error_message: str,
stack_trace: Optional[str] = None) -> bool:
"""记录错误信息
Args:
plugin_name: 插件名称
command: 触发的命令
user_id: 用户ID
group_id: 群组ID私聊为None
error_message: 错误信息
stack_trace: 堆栈跟踪
Returns:
是否成功记录
"""
sql = """
INSERT INTO t_error_logs
(plugin_name, command, user_id, group_id, error_message, stack_trace)
VALUES (%s, %s, %s, %s, %s, %s)
"""
params = (plugin_name, command, user_id, group_id, error_message, stack_trace)
return self.execute_update(sql, params)
def _update_plugin_stats(self, plugin_name: str, command: str, success: bool,
process_time_ms: float, is_group: bool) -> bool:
"""更新插件统计汇总表
Args:
plugin_name: 插件名称
command: 触发的命令
success: 是否调用成功
process_time_ms: 处理时间(毫秒)
is_group: 是否群聊
Returns:
是否成功更新
"""
today = datetime.now().strftime("%Y-%m-%d")
# 先查询当前记录
query_sql = """
SELECT id, total_calls, success_calls, failed_calls, group_calls,
private_calls, avg_process_time
FROM t_plugin_stats
WHERE plugin_name = %s AND command = %s AND stat_date = %s
"""
query_params = (plugin_name, command, today)
result = self.execute_query(query_sql, query_params, fetch_one=True)
if result:
# 更新现有记录
total_calls = result['total_calls'] + 1
success_calls = result['success_calls'] + (1 if success else 0)
failed_calls = result['failed_calls'] + (0 if success else 1)
group_calls = result['group_calls'] + (1 if is_group else 0)
private_calls = result['private_calls'] + (0 if is_group else 1)
# 计算新的平均处理时间
old_avg = result['avg_process_time']
new_avg = ((old_avg * (total_calls - 1)) + process_time_ms) / total_calls
update_sql = """
UPDATE t_plugin_stats
SET total_calls = %s,
success_calls = %s,
failed_calls = %s,
group_calls = %s,
private_calls = %s,
avg_process_time = %s
WHERE id = %s
"""
update_params = (total_calls, success_calls, failed_calls, group_calls,
private_calls, new_avg, result['id'])
return self.execute_update(update_sql, update_params)
else:
# 插入新记录
insert_sql = """
INSERT INTO t_plugin_stats
(plugin_name, command, stat_date, total_calls, success_calls,
failed_calls, group_calls, private_calls, avg_process_time)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
insert_params = (
plugin_name, command, today, 1,
1 if success else 0,
0 if success else 1,
1 if is_group else 0,
0 if is_group else 1,
process_time_ms
)
return self.execute_update(insert_sql, insert_params)
def _update_user_stats(self, user_id: str, plugin_name: str,
command: str, success: bool) -> bool:
"""更新用户使用统计表
Args:
user_id: 用户ID
plugin_name: 插件名称
command: 触发的命令
success: 是否调用成功
Returns:
是否成功更新
"""
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 查询是否存在记录
query_sql = """
SELECT id, total_calls, success_calls, failed_calls, first_used_at
FROM t_user_stats
WHERE user_id = %s AND plugin_name = %s AND command = %s
"""
query_params = (user_id, plugin_name, command)
result = self.execute_query(query_sql, query_params, fetch_one=True)
if result:
# 更新现有记录
update_sql = """
UPDATE t_user_stats
SET total_calls = total_calls + 1,
success_calls = success_calls + %s,
failed_calls = failed_calls + %s,
last_used_at = %s
WHERE id = %s
"""
update_params = (1 if success else 0, 0 if success else 1, now, result['id'])
return self.execute_update(update_sql, update_params)
else:
# 插入新记录
insert_sql = """
INSERT INTO t_user_stats
(user_id, plugin_name, command, total_calls, success_calls,
failed_calls, first_used_at, last_used_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
"""
insert_params = (
user_id, plugin_name, command, 1,
1 if success else 0,
0 if success else 1,
now, now
)
return self.execute_update(insert_sql, insert_params)
def _update_group_stats(self, group_id: str, plugin_name: str,
command: str, user_id: str, success: bool) -> bool:
"""更新群组使用统计表
Args:
group_id: 群组ID
plugin_name: 插件名称
command: 触发的命令
user_id: 用户ID
success: 是否调用成功
Returns:
是否成功更新
"""
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 查询是否存在记录
query_sql = """
SELECT id, total_calls, success_calls, failed_calls, unique_users, first_used_at
FROM t_group_stats
WHERE group_id = %s AND plugin_name = %s AND command = %s
"""
query_params = (group_id, plugin_name, command)
result = self.execute_query(query_sql, query_params, fetch_one=True)
# 记录“群 + 插件 + 命令 + 用户”去重关系,再计算该群该命令唯一用户数
unique_users = 1
if self._ensure_group_user_tracker_table():
upsert_unique_user_sql = """
INSERT INTO t_group_command_user_stats
(group_id, plugin_name, command, user_id, first_used_at, last_used_at)
VALUES (%s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
last_used_at = VALUES(last_used_at)
"""
self.execute_update(
upsert_unique_user_sql,
(group_id, plugin_name, command, user_id, now, now)
)
user_query_sql = """
SELECT COUNT(*) as user_count
FROM t_group_command_user_stats
WHERE group_id = %s AND plugin_name = %s AND command = %s
"""
user_query_params = (group_id, plugin_name, command)
user_result = self.execute_query(user_query_sql, user_query_params, fetch_one=True)
unique_users = user_result['user_count'] if user_result and user_result.get('user_count') is not None else 1
elif result:
unique_users = max(result.get('unique_users') or 0, 1)
if result:
# 更新现有记录
update_sql = """
UPDATE t_group_stats
SET total_calls = total_calls + 1,
success_calls = success_calls + %s,
failed_calls = failed_calls + %s,
unique_users = %s,
last_used_at = %s
WHERE id = %s
"""
update_params = (1 if success else 0, 0 if success else 1,
unique_users, now, result['id'])
return self.execute_update(update_sql, update_params)
else:
# 插入新记录
insert_sql = """
INSERT INTO t_group_stats
(group_id, plugin_name, command, total_calls, success_calls,
failed_calls, unique_users, first_used_at, last_used_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
insert_params = (
group_id, plugin_name, command, 1,
1 if success else 0,
0 if success else 1,
unique_users, now, now
)
return self.execute_update(insert_sql, insert_params)
def get_plugin_stats(self, days: int = 7) -> List[Dict]:
"""获取插件使用统计
Args:
days: 统计天数
Returns:
插件统计列表
"""
sql = """
SELECT plugin_name, command,
SUM(total_calls) as total_calls,
SUM(success_calls) as success_calls,
SUM(failed_calls) as failed_calls,
AVG(avg_process_time) as avg_process_time
FROM t_plugin_stats
WHERE stat_date >= DATE_SUB(CURDATE(), INTERVAL %s DAY)
GROUP BY plugin_name, command
ORDER BY total_calls DESC
"""
return self.execute_query(sql, (days,)) or []
def get_user_stats(self, days: int = 7, limit: int = 20) -> List[Dict]:
"""获取用户使用统计
Args:
days: 统计天数
limit: 返回记录数量限制
Returns:
用户统计列表
"""
sql = """
SELECT user_id,
SUM(total_calls) as total_calls,
SUM(success_calls) as success_calls,
SUM(failed_calls) as failed_calls,
COUNT(DISTINCT plugin_name) as used_plugins
FROM t_user_stats
WHERE last_used_at >= DATE_SUB(CURDATE(), INTERVAL %s DAY)
GROUP BY user_id
ORDER BY total_calls DESC
LIMIT %s
"""
return self.execute_query(sql, (days, limit)) or []
def get_group_stats(self, days: int = 7, limit: int = 20) -> List[Dict]:
"""获取群组使用统计
Args:
days: 统计天数
limit: 返回记录数量限制
Returns:
群组统计列表
"""
sql = """
SELECT group_id,
SUM(total_calls) as total_calls,
SUM(success_calls) as success_calls,
SUM(failed_calls) as failed_calls,
COUNT(DISTINCT plugin_name) as used_plugins,
MAX(unique_users) as unique_users
FROM t_group_stats
WHERE last_used_at >= DATE_SUB(CURDATE(), INTERVAL %s DAY)
GROUP BY group_id
ORDER BY total_calls DESC
LIMIT %s
"""
return self.execute_query(sql, (days, limit)) or []
def get_error_logs(self, days: int = 7, page: int = 1, limit: int = 20) -> Tuple[List[Dict], int]:
"""获取错误日志
Args:
days: 统计天数
page: 页码
limit: 每页数量
Returns:
(日志列表, 总数)
"""
# 1. 获取总数
count_sql = """
SELECT COUNT(*) as total
FROM t_error_logs
WHERE created_at >= DATE_SUB(CURDATE(), INTERVAL %s DAY)
"""
count_result = self.execute_query(count_sql, (days,), fetch_one=True)
total = count_result['total'] if count_result else 0
# 2. 获取分页数据
offset = (page - 1) * limit
sql = """
SELECT id, plugin_name, command, user_id, group_id,
error_message, stack_trace, created_at
FROM t_error_logs
WHERE created_at >= DATE_SUB(CURDATE(), INTERVAL %s DAY)
ORDER BY created_at DESC
LIMIT %s OFFSET %s
"""
logs = self.execute_query(sql, (days, limit, offset)) or []
return logs, total
def get_error_detail(self, error_id: int) -> Optional[Dict]:
"""获取错误详情
Args:
error_id: 错误ID
Returns:
错误详情
"""
sql = """
SELECT id, plugin_name, command, user_id, group_id,
error_message, stack_trace, created_at
FROM t_error_logs
WHERE id = %s
"""
return self.execute_query(sql, (error_id,), fetch_one=True)
def get_dashboard_summary(self, days: int = 7) -> Dict:
"""获取仪表盘摘要数据
Args:
days: 统计天数
Returns:
仪表盘摘要数据
"""
# 获取时间范围
end_date = datetime.now()
start_date = end_date - timedelta(days=days)
start_date_str = start_date.strftime("%Y-%m-%d")
# 1. 总调用次数
total_calls_sql = """
SELECT SUM(total_calls) as total_calls
FROM t_plugin_stats
WHERE stat_date >= %s
"""
total_calls_result = self.execute_query(total_calls_sql, (start_date_str,), fetch_one=True)
total_calls = total_calls_result['total_calls'] if total_calls_result and total_calls_result['total_calls'] else 0
# 2. 成功率
success_rate_sql = """
SELECT SUM(success_calls) as success_calls, SUM(total_calls) as total_calls
FROM t_plugin_stats
WHERE stat_date >= %s
"""
success_rate_result = self.execute_query(success_rate_sql, (start_date_str,), fetch_one=True)
success_rate = 0.0
if success_rate_result and success_rate_result['total_calls']:
success_rate = float((success_rate_result['success_calls'] / success_rate_result['total_calls']) * 100)
# 3. 活跃用户数
active_users_sql = """
SELECT COUNT(DISTINCT user_id) as active_users
FROM t_user_stats
WHERE last_used_at >= %s
"""
active_users_result = self.execute_query(active_users_sql, (start_date_str,), fetch_one=True)
active_users = active_users_result['active_users'] if active_users_result else 0
# 3.1 新增用户(统计窗口内首次触发)
new_users_sql = """
SELECT COUNT(DISTINCT user_id) as new_users
FROM t_user_stats
WHERE first_used_at >= %s
"""
new_users_result = self.execute_query(new_users_sql, (start_date_str,), fetch_one=True)
new_users = new_users_result['new_users'] if new_users_result else 0
# 4. 活跃群组数
active_groups_sql = """
SELECT COUNT(DISTINCT group_id) as active_groups
FROM t_group_stats
WHERE last_used_at >= %s
"""
active_groups_result = self.execute_query(active_groups_sql, (start_date_str,), fetch_one=True)
active_groups = active_groups_result['active_groups'] if active_groups_result else 0
# 5. 错误数量
error_count_sql = """
SELECT COUNT(*) as error_count
FROM t_error_logs
WHERE created_at >= %s
"""
error_count_result = self.execute_query(error_count_sql, (start_date_str,), fetch_one=True)
error_count = error_count_result['error_count'] if error_count_result else 0
# 6. 平均响应时间
avg_response_time_sql = """
SELECT AVG(avg_process_time) as avg_response_time
FROM t_plugin_stats
WHERE stat_date >= %s
"""
avg_response_time_result = self.execute_query(avg_response_time_sql, (start_date_str,), fetch_one=True)
avg_response_time = float(
avg_response_time_result['avg_response_time']
) if avg_response_time_result and avg_response_time_result['avg_response_time'] else 0.0
# 6.1 群渗透率(活跃群内,触发人数 / 群成员数 的均值)
group_penetration_sql = """
SELECT AVG(group_penetration) AS avg_group_penetration
FROM (
SELECT
gs.group_id,
CASE
WHEN gm.member_count > 0
THEN (LEAST(gs.unique_users, gm.member_count) / gm.member_count) * 100
ELSE NULL
END AS group_penetration
FROM (
SELECT group_id, MAX(unique_users) AS unique_users
FROM t_group_stats
WHERE last_used_at >= %s
GROUP BY group_id
) gs
LEFT JOIN (
SELECT chatroom_id AS group_id, COUNT(*) AS member_count
FROM t_chatroom_member
WHERE status = 1 OR status IS NULL
GROUP BY chatroom_id
) gm ON gs.group_id = gm.group_id
) t
WHERE group_penetration IS NOT NULL
"""
group_penetration_result = self.execute_query(group_penetration_sql, (start_date_str,), fetch_one=True)
avg_group_penetration = 0
if group_penetration_result and group_penetration_result.get('avg_group_penetration') is not None:
avg_group_penetration = float(group_penetration_result['avg_group_penetration'])
# 6.2 群健康分(成功率 + 响应速度融合评分)
# 响应时间评分: 500ms 及以内为满分, 5000ms 及以上趋近0分
latency_score = 0
if avg_response_time <= 500:
latency_score = 100
elif avg_response_time >= 5000:
latency_score = 0
else:
latency_score = max(0, min(100, ((5000 - avg_response_time) / 4500) * 100))
group_health_score = (success_rate * 0.7) + (latency_score * 0.3)
# 7. 最常用的插件(优先统计窗口,无数据时回退全量)
top_plugins_sql = """
SELECT plugin_name, SUM(total_calls) as total_calls
FROM t_plugin_stats
WHERE stat_date >= %s
GROUP BY plugin_name
ORDER BY total_calls DESC
LIMIT 5
"""
top_plugins = self.execute_query(top_plugins_sql, (start_date_str,)) or []
if not top_plugins:
top_plugins_all_sql = """
SELECT plugin_name, SUM(total_calls) as total_calls
FROM t_plugin_stats
GROUP BY plugin_name
ORDER BY total_calls DESC
LIMIT 5
"""
top_plugins = self.execute_query(top_plugins_all_sql) or []
# 8. 最活跃的用户(优先统计窗口,无数据时回退全量)
top_users_sql = """
SELECT user_id, SUM(total_calls) as total_calls
FROM t_user_stats
WHERE last_used_at >= %s
GROUP BY user_id
ORDER BY total_calls DESC
LIMIT 5
"""
top_users = self.execute_query(top_users_sql, (start_date_str,)) or []
if not top_users:
top_users_all_sql = """
SELECT user_id, SUM(total_calls) as total_calls
FROM t_user_stats
GROUP BY user_id
ORDER BY total_calls DESC
LIMIT 5
"""
top_users = self.execute_query(top_users_all_sql) or []
# 9. 最活跃的群组(优先统计窗口,无数据时回退全量)
top_groups_sql = """
SELECT group_id, SUM(total_calls) as total_calls
FROM t_group_stats
WHERE last_used_at >= %s
GROUP BY group_id
ORDER BY total_calls DESC
LIMIT 5
"""
top_groups = self.execute_query(top_groups_sql, (start_date_str,)) or []
if not top_groups:
top_groups_all_sql = """
SELECT group_id, SUM(total_calls) as total_calls
FROM t_group_stats
GROUP BY group_id
ORDER BY total_calls DESC
LIMIT 5
"""
top_groups = self.execute_query(top_groups_all_sql) or []
# 返回汇总数据
return {
"total_calls": total_calls,
"success_rate": success_rate,
"active_users": active_users,
"new_users": new_users,
"active_groups": active_groups,
"error_count": error_count,
"avg_response_time": avg_response_time,
"avg_group_penetration": round(avg_group_penetration, 2),
"group_health_score": round(group_health_score, 2),
"top_plugins": top_plugins,
"top_users": top_users,
"top_groups": top_groups
}
def get_plugin_trend(self, plugin_name: str = "", days: int = 7) -> List[Dict]:
"""获取插件使用趋势数据
Args:
plugin_name: 插件名称,为空则获取所有插件
days: 统计天数
Returns:
插件使用趋势数据
"""
# 获取时间范围内的每一天
end_date = datetime.now().date()
start_date = end_date - timedelta(days=days-1) # 包含今天所以减1
if plugin_name:
# 获取特定插件的趋势
sql = """
SELECT stat_date, SUM(total_calls) as total_calls,
SUM(success_calls) as success_calls,
SUM(failed_calls) as failed_calls
FROM t_plugin_stats
WHERE plugin_name = %s AND stat_date >= %s
GROUP BY stat_date
ORDER BY stat_date
"""
params = (plugin_name, start_date)
else:
# 获取所有插件的趋势
sql = """
SELECT stat_date, SUM(total_calls) as total_calls,
SUM(success_calls) as success_calls,
SUM(failed_calls) as failed_calls
FROM t_plugin_stats
WHERE stat_date >= %s
GROUP BY stat_date
ORDER BY stat_date
"""
params = (start_date,)
results = self.execute_query(sql, params) or []
# 将结果转换为按日期的字典
trend_by_date = {r['stat_date'].strftime('%Y-%m-%d'): r for r in results}
# 确保每一天都有数据
trend_data = []
current_date = start_date
while current_date <= end_date:
date_str = current_date.strftime('%Y-%m-%d')
if date_str in trend_by_date:
data = trend_by_date[date_str]
trend_data.append({
'date': date_str,
'total_calls': data['total_calls'],
'success_calls': data['success_calls'],
'failed_calls': data['failed_calls'],
'success_rate': (data['success_calls'] / data['total_calls'] * 100) if data['total_calls'] > 0 else 0
})
else:
trend_data.append({
'date': date_str,
'total_calls': 0,
'success_calls': 0,
'failed_calls': 0,
'success_rate': 0
})
current_date += timedelta(days=1)
return trend_data
def get_group_plugin_stats(self, group_id: str, days: int = 30, limit: int = 10) -> List[Dict]:
"""获取指定群的插件调用统计"""
sql = """
SELECT
plugin_name,
command,
SUM(total_calls) AS total_calls,
SUM(success_calls) AS success_calls,
SUM(failed_calls) AS failed_calls,
MAX(unique_users) AS unique_users,
MIN(first_used_at) AS first_used_at,
MAX(last_used_at) AS last_used_at
FROM t_group_stats
WHERE group_id = %s
AND last_used_at >= DATE_SUB(NOW(), INTERVAL %s DAY)
GROUP BY plugin_name, command
ORDER BY total_calls DESC, last_used_at DESC
LIMIT %s
"""
rows = self.execute_query(sql, (group_id, days, limit)) or []
for row in rows:
for key in ("first_used_at", "last_used_at"):
dt = row.get(key)
if isinstance(dt, datetime):
row[key] = dt.strftime("%Y-%m-%d %H:%M:%S")
return rows
def get_group_plugin_summary(self, group_id: str, days: int = 30) -> Dict:
"""获取指定群的插件调用摘要"""
sql = """
SELECT
SUM(total_calls) AS total_calls,
SUM(success_calls) AS success_calls,
SUM(failed_calls) AS failed_calls,
COUNT(DISTINCT plugin_name) AS plugin_count,
MAX(last_used_at) AS last_used_at
FROM t_group_stats
WHERE group_id = %s
AND last_used_at >= DATE_SUB(NOW(), INTERVAL %s DAY)
"""
result = self.execute_query(sql, (group_id, days), fetch_one=True) or {}
dt = result.get("last_used_at")
if isinstance(dt, datetime):
result["last_used_at"] = dt.strftime("%Y-%m-%d %H:%M:%S")
return {
"total_calls": int(result.get("total_calls") or 0),
"success_calls": int(result.get("success_calls") or 0),
"failed_calls": int(result.get("failed_calls") or 0),
"plugin_count": int(result.get("plugin_count") or 0),
"last_used_at": result.get("last_used_at") or "",
}
def get_user_plugin_stats(self, user_id: str, days: int = 30, limit: int = 10) -> List[Dict]:
"""获取指定用户的插件调用统计"""
sql = """
SELECT
plugin_name,
command,
SUM(total_calls) AS total_calls,
SUM(success_calls) AS success_calls,
SUM(failed_calls) AS failed_calls,
MIN(first_used_at) AS first_used_at,
MAX(last_used_at) AS last_used_at
FROM t_user_stats
WHERE user_id = %s
AND last_used_at >= DATE_SUB(NOW(), INTERVAL %s DAY)
GROUP BY plugin_name, command
ORDER BY total_calls DESC, last_used_at DESC
LIMIT %s
"""
rows = self.execute_query(sql, (user_id, days, limit)) or []
for row in rows:
for key in ("first_used_at", "last_used_at"):
dt = row.get(key)
if isinstance(dt, datetime):
row[key] = dt.strftime("%Y-%m-%d %H:%M:%S")
return rows
def get_user_plugin_summary(self, user_id: str, days: int = 30) -> Dict:
"""获取指定用户的插件调用摘要"""
sql = """
SELECT
SUM(total_calls) AS total_calls,
SUM(success_calls) AS success_calls,
SUM(failed_calls) AS failed_calls,
COUNT(DISTINCT plugin_name) AS plugin_count,
MAX(last_used_at) AS last_used_at
FROM t_user_stats
WHERE user_id = %s
AND last_used_at >= DATE_SUB(NOW(), INTERVAL %s DAY)
"""
result = self.execute_query(sql, (user_id, days), fetch_one=True) or {}
dt = result.get("last_used_at")
if isinstance(dt, datetime):
result["last_used_at"] = dt.strftime("%Y-%m-%d %H:%M:%S")
return {
"total_calls": int(result.get("total_calls") or 0),
"success_calls": int(result.get("success_calls") or 0),
"failed_calls": int(result.get("failed_calls") or 0),
"plugin_count": int(result.get("plugin_count") or 0),
"last_used_at": result.get("last_used_at") or "",
}