Files
abot/admin/dashboard/server.py

612 lines
30 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
统计看板服务器 - 使用Flask蓝图重构版
"""
import os
import sys
import threading
import time
import secrets
from datetime import timedelta
import toml
from flask import Flask, send_from_directory
from loguru import logger
from db.contacts_db import ContactsDBOperator
from db.admin_account_db import AdminAccountDBOperator
from db.emoji_asset_db import EmojiAssetDB
from db.member_context_db import MemberContextDBOperator
from db.message_storage import MessageStorageDB
from db.message_summary_db import MessageSummaryDBOperator
from db.stats_db import StatsDBOperator
from db.task_db import TaskDBOperator
from db.fun_command_rule_db import FunCommandRuleDBOperator
from utils.fun_command_rule_service import FunCommandRuleService
from wechat_ipad import WechatAPIClient
# 添加项目根目录到系统路径,确保可以导入项目模块
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')))
class DashboardServer:
"""统计看板服务器"""
@property
def client(self) -> WechatAPIClient | None:
"""动态返回当前 robot 上挂载的微信客户端。
说明:
1. Dashboard 进程通常比 wechat 登录完成更早启动,因此不能只在构造时拍平一份固定引用;
2. 这里每次访问都优先回看 `robot.ipad_bot`,可自动吃到后续 provider 初始化完成后的真实对象;
3. 同时保留 `_client` 兜底,兼容未来如果需要在测试里手工注入 mock client 的场景。
"""
robot = getattr(self, "robot", None)
if robot is not None:
dynamic_client = getattr(robot, "ipad_bot", None)
if dynamic_client is not None:
self._client = dynamic_client
return dynamic_client
return getattr(self, "_client", None)
@client.setter
def client(self, value: WechatAPIClient | None) -> None:
"""允许初始化或测试场景显式覆写当前 client。"""
self._client = value
def __init__(self, host: str = None, port: int = None,
username: str = None, password: str = None,
robot_instance=None):
# 加载配置文件
self.config = self._load_dashboard_config()
# 优先使用传入的参数,其次使用配置文件中的参数
self.host = host or self.config.get("server", {}).get("host", "0.0.0.0")
self.port = port or self.config.get("server", {}).get("port", 8888)
self.username = username or self.config.get("auth", {}).get("username", "admin")
self.password = password or self.config.get("auth", {}).get("password", "admin123")
self.LOG = logger
self.LOG.info(f"Dashboard配置加载完成: 服务器将运行在 {self.host}:{self.port}")
# 登录失败限流兜底缓存:
# 1. 优先尝试 Redis但为了兼容 Redis 暂不可用的场景,这里保留进程内兜底;
# 2. 字典内容只保存短期登录失败窗口,不用于持久化;
# 3. 线程化 WSGI 会并发访问,因此需要显式加锁。
self._auth_runtime_lock = threading.Lock()
self._auth_failures = {}
# 如果提供了robot实例则使用其对象
if robot_instance:
self.db_manager = robot_instance.db_manager
self.stats_db = StatsDBOperator(self.db_manager)
# Dashboard 启动可能早于 iPad 登录完成:
# 1. 此时 Robot 上的 message_storage 还没来得及绑定真实 bot
# 2. 但后台很多页面仍然依赖消息存储与表情资产库;
# 3. 因此这里优先复用 Robot 已初始化的 message_storage没有则再安全回退到 DB 层对象。
self.message_storage = getattr(robot_instance, "message_storage", None) or MessageStorageDB(self.db_manager)
self.emoji_asset_db = getattr(self.message_storage, "emoji_asset_db", None) or EmojiAssetDB(self.db_manager)
# 群运营分析 2.0 会直接复用群消息总结表:
# 1. 这类数据已经由现有插件产出,不需要另起一套采集逻辑;
# 2. 统一在 DashboardServer 上挂载,便于多个后台蓝图复用;
# 3. 即使对应插件未在当前请求时运行,数据库读能力也应保持可用。
self.message_summary_db = MessageSummaryDBOperator(self.db_manager)
self.contact_db: ContactsDBOperator = ContactsDBOperator(self.db_manager)
self.member_context_db = MemberContextDBOperator(self.db_manager)
self.task_db: TaskDBOperator = TaskDBOperator(self.db_manager)
# 后台管理员账号数据层:用于登录鉴权与修改密码。
self.admin_account_db = AdminAccountDBOperator(self.db_manager)
self.system_job_db = robot_instance.system_job_db
self.system_job_loader = robot_instance.system_job_loader
self.plugin_schedule_db = robot_instance.plugin_schedule_db
self.plugin_schedule_manager = robot_instance.plugin_schedule_manager
self.group_plugin_config_db = robot_instance.group_plugin_config_db
self.llm_catalog_db = robot_instance.llm_catalog_db
self.group_plugin_config_service = robot_instance.group_plugin_config_service
# 趣味指令规则服务:用于“文案/事件触发多媒体玩法回复”后台配置与缓存。
# 这里统一在 Dashboard 启动时初始化,保证管理端可直接读写规则。
self.fun_command_rule_db = FunCommandRuleDBOperator(self.db_manager)
self.fun_command_rule_service = FunCommandRuleService(
db_operator=self.fun_command_rule_db,
redis_client=self.db_manager.get_redis_connection(),
local_ttl_seconds=30,
)
self.fun_command_rule_service.init_tables()
self.fun_command_rule_service.refresh_cache()
# 获取联系人管理器实例
self.contact_manager = robot_instance.contact_manager
self.plugin_manager = robot_instance.plugin_manager
self.plugin_registry = robot_instance.plugin_registry
self.robot = robot_instance
# Dashboard 启动时不再强绑一个“拍平后的 client 引用”:
# 1. wechat 线程可能还在 provider 初始化或登录流程中;
# 2. 这里只先记录一个初始值,后续统一通过 `client` 属性动态读取 robot.ipad_bot
# 3. 这样后台页在启动竞态下不会因为抢跑而持有一个永远为 None 的旧引用。
self.client = getattr(robot_instance, "ipad_bot", None)
self.member_context_plugin = self.plugin_manager.plugins.get("成员交互摘要")
self.member_context_service = getattr(self.member_context_plugin, "service", None)
self.LOG.info("使用Robot实例的对象进行初始化")
# 初始化后台管理员账号表,并将旧配置中的默认账号平滑迁移进数据库。
try:
table_ok = self.admin_account_db.init_tables()
if not table_ok:
self.LOG.warning("初始化后台账号表失败,将回退旧配置账号模式")
else:
seed_ok = self.admin_account_db.ensure_default_admin(self.username, self.password, "系统管理员")
if seed_ok:
self.LOG.info("后台账号体系初始化完成(数据库账号模式已可用)")
else:
self.LOG.warning("后台账号种子初始化失败,请检查配置中的默认账号信息")
except Exception as e:
self.LOG.error(f"初始化后台账号体系失败,将回退旧配置账号模式: {e}")
else:
self.LOG.error("未提供Robot实例Dashboard无法正常工作")
raise ValueError("必须提供Robot实例")
self.app = self._create_app()
self._stop_event = threading.Event()
self._server = None # 存储服务器实例
def _load_dashboard_config(self):
"""加载Dashboard配置文件"""
try:
config_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'config.toml')
if os.path.exists(config_path):
with open(config_path, 'r', encoding='utf-8') as f:
config_data = toml.load(f)
else:
# 如果配置文件不存在,创建默认配置
config_data = {
"server": {"host": "0.0.0.0", "port": 8888},
"auth": {"username": "admin", "password": "ChangeThisPassword_2026!"},
"trendradar_webhook": {
"enabled": False,
"token": "please_change_this_token",
"default_group_ids": [],
"allow_payload_target_groups": False,
"send_timeout_seconds": 20,
},
}
with open(config_path, 'w', encoding='utf-8') as f:
toml.dump(config_data, f)
return self._apply_dashboard_env_overrides(config_data)
except Exception as e:
# 这里属于 Dashboard 构造早期阶段:
# 1. `self.LOG` 还没在 `__init__` 中完成赋值;
# 2. 若此时配置文件损坏,仍然需要把错误稳定打印出来;
# 3. 因此这里直接回退到模块级 logger避免异常再次被属性缺失覆盖。
logger.error(f"加载Dashboard配置文件失败: {e}")
# 返回默认配置
fallback_config = {
"server": {"host": "0.0.0.0", "port": 8888},
"auth": {"username": "admin", "password": "ChangeThisPassword_2026!"},
"trendradar_webhook": {
"enabled": False,
"token": "please_change_this_token",
"default_group_ids": [],
"allow_payload_target_groups": False,
"send_timeout_seconds": 20,
},
}
return self._apply_dashboard_env_overrides(fallback_config)
@staticmethod
def _safe_env_bool(value, default: bool) -> bool:
"""把环境变量里的开关值安全转成布尔值。"""
if value in (None, ""):
return default
text = str(value).strip().lower()
if text in {"1", "true", "yes", "y", "on"}:
return True
if text in {"0", "false", "no", "n", "off"}:
return False
return default
@staticmethod
def _safe_env_int(value, default: int) -> int:
"""把环境变量里的数字安全转成整数。"""
try:
if value in (None, ""):
return default
return int(value)
except (TypeError, ValueError):
return default
@staticmethod
def _safe_env_group_ids(value) -> list[str]:
"""把逗号分隔的群列表环境变量转成数组。"""
raw_text = str(value or "").strip()
if not raw_text:
return []
return [item.strip() for item in raw_text.split(",") if item.strip()]
def _apply_dashboard_env_overrides(self, config_data: dict) -> dict:
"""用环境变量覆盖 Dashboard 配置中的敏感项与部署项。"""
merged_config = dict(config_data or {})
merged_config["server"] = dict(merged_config.get("server", {}) or {})
merged_config["auth"] = dict(merged_config.get("auth", {}) or {})
merged_config["trendradar_webhook"] = dict(merged_config.get("trendradar_webhook", {}) or {})
# 这里优先让运维通过环境变量覆盖后台配置:
# 1. Docker / PaaS 场景下更容易统一管理账号、端口与 token
# 2. 也避免公开仓库里的示例 TOML 被误当成最终生产配置;
# 3. 未配置环境变量时仍回退到本地文件,兼容现有非容器部署方式。
server_section = merged_config["server"]
auth_section = merged_config["auth"]
webhook_section = merged_config["trendradar_webhook"]
server_section["host"] = str(os.environ.get("DASHBOARD_HOST", server_section.get("host", "0.0.0.0")) or "0.0.0.0")
server_section["port"] = self._safe_env_int(
os.environ.get("DASHBOARD_PORT", server_section.get("port", 8888)),
8888,
)
auth_section["username"] = str(
os.environ.get("ABOT_DASHBOARD_USERNAME", auth_section.get("username", "admin")) or "admin"
).strip()
auth_section["password"] = str(
os.environ.get("ABOT_DASHBOARD_PASSWORD", auth_section.get("password", "ChangeThisPassword_2026!"))
or "ChangeThisPassword_2026!"
)
auth_section["session_timeout_minutes"] = self._safe_env_int(
os.environ.get("ABOT_DASHBOARD_SESSION_TIMEOUT_MINUTES", auth_section.get("session_timeout_minutes", 480)),
480,
)
auth_section["max_failed_attempts"] = self._safe_env_int(
os.environ.get("ABOT_DASHBOARD_MAX_FAILED_ATTEMPTS", auth_section.get("max_failed_attempts", 5)),
5,
)
auth_section["lock_seconds"] = self._safe_env_int(
os.environ.get("ABOT_DASHBOARD_LOCK_SECONDS", auth_section.get("lock_seconds", 900)),
900,
)
auth_section["cookie_secure"] = self._safe_env_bool(
os.environ.get("ABOT_DASHBOARD_COOKIE_SECURE", auth_section.get("cookie_secure", False)),
False,
)
auth_section["cookie_samesite"] = str(
os.environ.get("ABOT_DASHBOARD_COOKIE_SAMESITE", auth_section.get("cookie_samesite", "Lax")) or "Lax"
).strip()
webhook_section["enabled"] = self._safe_env_bool(
os.environ.get("ABOT_TRENDRADAR_WEBHOOK_ENABLED", webhook_section.get("enabled", False)),
False,
)
webhook_section["token"] = str(
os.environ.get("ABOT_TRENDRADAR_WEBHOOK_TOKEN", webhook_section.get("token", "")) or ""
).strip()
env_default_groups = os.environ.get("ABOT_TRENDRADAR_DEFAULT_GROUP_IDS", "")
if env_default_groups not in (None, ""):
webhook_section["default_group_ids"] = self._safe_env_group_ids(env_default_groups)
webhook_section["allow_payload_target_groups"] = self._safe_env_bool(
os.environ.get(
"ABOT_TRENDRADAR_ALLOW_PAYLOAD_TARGET_GROUPS",
webhook_section.get("allow_payload_target_groups", False),
),
False,
)
webhook_section["send_timeout_seconds"] = self._safe_env_int(
os.environ.get(
"ABOT_TRENDRADAR_SEND_TIMEOUT_SECONDS",
webhook_section.get("send_timeout_seconds", 20),
),
20,
)
return merged_config
def _create_app(self) -> Flask:
"""创建Flask应用"""
# 指定模板文件夹路径
template_folder = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'templates')
app = Flask(__name__, template_folder=template_folder)
auth_config = self.config.get("auth", {}) or {}
session_timeout_minutes = int(auth_config.get("session_timeout_minutes", 480) or 480)
cookie_secure = bool(auth_config.get("cookie_secure", False))
configured_secret = str(
os.environ.get("ABOT_DASHBOARD_SECRET_KEY")
or auth_config.get("secret_key", "")
or ""
).strip()
if configured_secret:
app.secret_key = configured_secret
else:
# 若未显式配置 secret_key则每次进程启动生成随机值
# 1. 这比固定硬编码密钥安全得多;
# 2. 代价是服务重启后旧 session 会失效,作为安全兜底是可接受的;
# 3. 同时输出 warning提醒后续最好通过配置或环境变量固定注入。
app.secret_key = secrets.token_hex(32)
self.LOG.warning("未配置 Dashboard secret_key已使用进程级随机密钥重启后现有登录会失效")
# 禁用模板缓存使修改HTML文件后立即生效 False =重启才生效
app.config['TEMPLATES_AUTO_RELOAD'] = True
app.config['SESSION_COOKIE_HTTPONLY'] = True
app.config['SESSION_COOKIE_SAMESITE'] = str(auth_config.get("cookie_samesite", "Lax") or "Lax")
app.config['SESSION_COOKIE_SECURE'] = cookie_secure
app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(minutes=max(15, session_timeout_minutes))
# 设置Werkzeug日志级别为DEBUG
import logging
logging.getLogger('werkzeug').setLevel(logging.ERROR)
# 将dashboard_server实例设置为app的属性
app.dashboard_server = self
# 配置静态文件访问
static_folder = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'static')
@app.route('/static/<path:filename>')
def serve_static(filename):
return send_from_directory(static_folder, filename)
# 获取项目根目录下的static/images目录
project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
images_dir = os.path.join(project_root, "static", "images")
# 确保目录存在
os.makedirs(images_dir, exist_ok=True)
@app.route('/static/images/<path:filename>')
def serve_images(filename):
return send_from_directory(images_dir, filename)
# 添加一个路由处理favicon请求
@app.route('/favicon.ico')
def favicon():
return send_from_directory(os.path.join(app.root_path, 'static'),
'favicon.ico', mimetype='image/vnd.microsoft.icon')
# 注册蓝图
self._register_blueprints(app)
return app
def get_auth_policy(self) -> dict:
"""读取后台认证策略配置。"""
auth_config = self.config.get("auth", {}) or {}
return {
"max_failed_attempts": max(3, int(auth_config.get("max_failed_attempts", 5) or 5)),
"lock_seconds": max(60, int(auth_config.get("lock_seconds", 900) or 900)),
"session_timeout_minutes": max(15, int(auth_config.get("session_timeout_minutes", 480) or 480)),
}
@staticmethod
def _build_login_guard_key(username: str, remote_ip: str) -> str:
"""构建登录限流键。"""
return f"{str(username or '').strip().lower()}::{str(remote_ip or '').strip() or 'unknown'}"
def _get_login_failure_record(self, guard_key: str) -> dict:
"""读取登录失败记录,优先 Redis失败时回退到进程内缓存。"""
try:
redis_conn = self.db_manager.get_redis_connection()
raw_value = redis_conn.get(f"dashboard:auth:fail:{guard_key}")
if raw_value:
parts = str(raw_value).split("|", 1)
return {
"count": int(parts[0] or 0),
"first_failed_at": float(parts[1] or 0) if len(parts) > 1 else 0.0,
}
except Exception:
pass
with self._auth_runtime_lock:
record = dict(self._auth_failures.get(guard_key, {}) or {})
expire_at = float(record.get("expire_at") or 0.0)
if expire_at > 0 and expire_at <= time.time():
self._auth_failures.pop(guard_key, None)
record = {}
return {
"count": int(record.get("count") or 0),
"first_failed_at": float(record.get("first_failed_at") or 0.0),
}
def _save_login_failure_record(self, guard_key: str, count: int, first_failed_at: float, ttl_seconds: int) -> None:
"""保存登录失败记录,优先 Redis失败时回退到进程内缓存。"""
payload = f"{int(count)}|{float(first_failed_at)}"
try:
redis_conn = self.db_manager.get_redis_connection()
redis_conn.setex(f"dashboard:auth:fail:{guard_key}", ttl_seconds, payload)
return
except Exception:
pass
expire_at = time.time() + ttl_seconds
with self._auth_runtime_lock:
self._auth_failures[guard_key] = {
"count": int(count),
"first_failed_at": float(first_failed_at),
"expire_at": expire_at,
}
def clear_login_failures(self, username: str, remote_ip: str) -> None:
"""清理登录失败记录。"""
guard_key = self._build_login_guard_key(username, remote_ip)
try:
redis_conn = self.db_manager.get_redis_connection()
redis_conn.delete(f"dashboard:auth:fail:{guard_key}")
except Exception:
pass
with self._auth_runtime_lock:
self._auth_failures.pop(guard_key, None)
def get_login_lock_status(self, username: str, remote_ip: str) -> dict:
"""获取当前账号/IP 组合的登录锁定状态。"""
policy = self.get_auth_policy()
record = self._get_login_failure_record(self._build_login_guard_key(username, remote_ip))
count = int(record.get("count") or 0)
first_failed_at = float(record.get("first_failed_at") or 0.0)
if count < policy["max_failed_attempts"] or first_failed_at <= 0:
return {"locked": False, "remaining_seconds": 0, "failed_count": count}
elapsed = max(0, int(time.time() - first_failed_at))
remaining_seconds = max(0, policy["lock_seconds"] - elapsed)
if remaining_seconds <= 0:
self.clear_login_failures(username, remote_ip)
return {"locked": False, "remaining_seconds": 0, "failed_count": 0}
return {"locked": True, "remaining_seconds": remaining_seconds, "failed_count": count}
def mark_login_failure(self, username: str, remote_ip: str) -> dict:
"""记录一次登录失败,并返回最新锁定状态。"""
policy = self.get_auth_policy()
guard_key = self._build_login_guard_key(username, remote_ip)
record = self._get_login_failure_record(guard_key)
count = int(record.get("count") or 0)
first_failed_at = float(record.get("first_failed_at") or 0.0)
now_ts = time.time()
if first_failed_at <= 0:
first_failed_at = now_ts
count += 1
self._save_login_failure_record(guard_key, count, first_failed_at, policy["lock_seconds"])
return self.get_login_lock_status(username, remote_ip)
def should_force_password_change(self, username: str) -> bool:
"""判断当前管理员是否应该被强制提示修改密码。"""
admin_db = getattr(self, "admin_account_db", None)
normalized_username = str(username or "").strip()
if not normalized_username:
return False
# 判断弱密码时优先且尽量只相信数据库中的真实账号状态。
#
# 之前的问题在于:
# 1. 代码先看数据库,再继续拿本地 config.toml 的默认用户名/密码做补充判断;
# 2. 如果数据库中的管理员已经改成强密码,但本地配置仍保留 admin/admin123
# 前端仍会被误判成“当前账号在使用弱密码”;
# 3. 这会让后台改密提示和真实账号状态脱节。
#
# 现在的策略改成:
# - 数据库里存在该管理员账号:只按数据库口令哈希判断;
# - 数据库里不存在该账号,才允许回退到旧配置模式,兼容尚未迁移完成的老部署。
if admin_db:
try:
admin_row = admin_db.get_admin_by_username(normalized_username)
if admin_row is not None:
return admin_db.is_using_risky_password(normalized_username)
except Exception as e:
# 安全提示属于辅助能力,数据库偶发异常时不应因为误判把用户“锁”在改密弹窗里。
# 因此这里记录 warning并继续走兼容兜底而不是直接强制提示弱密码。
self.LOG.warning(f"读取后台账号安全状态失败,将尝试兼容兜底判断: username={normalized_username}, error={e}")
fallback_username = str(self.username or "").strip()
fallback_password = str(self.password or "").strip()
risky_passwords = getattr(admin_db, "RISKY_PASSWORDS", {"admin123", "admin"})
return normalized_username == fallback_username and fallback_password in risky_passwords
def _register_blueprints(self, app):
"""注册所有蓝图"""
# 在函数内部导入蓝图,避免循环导入
from admin.dashboard.blueprints.auth import auth_bp
from admin.dashboard.blueprints.contacts import contacts_bp
from admin.dashboard.blueprints.robot import robot_bp
from admin.dashboard.blueprints.messages import messages_bp
from admin.dashboard.blueprints.stats import stats_bp
from admin.dashboard.blueprints.system import system_bp
from admin.dashboard.blueprints.main import main_bp
from admin.dashboard.blueprints.plugin_routes import plugin_routes
from admin.dashboard.blueprints.virtual_group import virtual_group_bp
from admin.dashboard.blueprints.file_browser import file_browser_bp
from admin.dashboard.blueprints.message_push import message_push_bp
from admin.dashboard.blueprints.friend_circle import friend_circle_bp
from admin.dashboard.blueprints.system_jobs import system_jobs_bp
from admin.dashboard.blueprints.plugin_schedules import plugin_schedules_bp
from admin.dashboard.blueprints.group_plugin_config import group_plugin_config_bp
from admin.dashboard.blueprints.fun_command_rules import fun_command_rules_bp
from admin.dashboard.blueprints.trendradar_webhook import trendradar_webhook_bp
# 在app.register_blueprint部分添加
app.register_blueprint(virtual_group_bp, url_prefix='/virtual_group')
app.register_blueprint(auth_bp)
app.register_blueprint(main_bp)
app.register_blueprint(contacts_bp)
app.register_blueprint(robot_bp)
app.register_blueprint(messages_bp)
app.register_blueprint(stats_bp)
app.register_blueprint(system_bp)
app.register_blueprint(plugin_routes)
app.register_blueprint(file_browser_bp)
app.register_blueprint(message_push_bp)
app.register_blueprint(friend_circle_bp)
app.register_blueprint(system_jobs_bp)
app.register_blueprint(plugin_schedules_bp)
app.register_blueprint(group_plugin_config_bp)
app.register_blueprint(fun_command_rules_bp)
app.register_blueprint(trendradar_webhook_bp)
self.LOG.info("所有蓝图已注册")
def run(self):
"""运行服务器"""
from werkzeug.serving import make_server
# 设置Werkzeug日志级别为DEBUG
import logging
logging.getLogger('werkzeug').setLevel(logging.ERROR)
self.LOG.info(f"启动服务器: {self.host}:{self.port}")
try:
# Dashboard 存在文件浏览、统计查询等慢请求,单线程 WSGI 一旦被占住会导致整个后台无响应。
# 改为 threaded server避免某个接口阻塞后拖死所有页面访问。
self._server = make_server(self.host, self.port, self.app, threaded=True)
self._server.serve_forever()
except Exception as e:
self.LOG.error(f"服务器运行失败: {e}")
self._stop_event.set()
def stop(self):
"""停止服务器"""
self.LOG.info("正在停止服务器...")
self._stop_event.set()
# 使用werkzeug服务器的关闭方法
if self._server:
self._server.shutdown()
self.LOG.info("服务器已停止")
def get_current_user_info(self):
"""获取当前登录的微信用户信息"""
try:
if not self.client:
self.LOG.error("client实例不可用无法获取当前用户信息")
return {"success": False, "message": "实例不可用"}
# 获取当前登录的微信ID
# 从新的resp格式中获取用户信息
try:
if self.robot is None:
raise ValueError("机器人对象未初始化")
user_data = {
"wxid": getattr(self.robot, "wxid", ""),
"nickName": getattr(self.robot, "nickname", ""),
"mobile": getattr(self.robot, "phone", ""),
"smallHeadImgUrl": getattr(self.robot, "head_image", ""),
"signature": getattr(self.robot, "signature", "")
}
except (AttributeError, ValueError) as e:
print(f"获取用户信息出错: {str(e)}")
user_data = {
"wxid": self.robot.wxid,
"nickName": self.robot.nickname,
"mobile": self.robot.phone,
"smallHeadImgUrl": self.robot.head_image,
"signature": self.robot.signature
}
if not user_data:
return {"success": False, "message": "未获取到用户数据"}
return {
"success": True,
"data": {
"wx_id": user_data.get("wxid", ""),
"nickname": user_data.get("nickName", "未知用户"),
"avatar": user_data.get("smallHeadImgUrl", "logo.png"), # 使用小头像URL
"mobile": user_data.get("mobile", ""),
"home": f"{user_data.get('province', '')}-{user_data.get('city', '')}", # 组合省市信息
"signature": user_data.get("signature", "")[:10]
}
}
except Exception as e:
self.LOG.error(f"获取当前用户信息失败: {e}")
return {"success": False, "message": f"获取用户信息出错: {str(e)}"}