Files
abot/admin/dashboard/blueprints/contacts.py
2025-05-07 17:16:25 +08:00

194 lines
5.8 KiB
Python

import asyncio
from flask import Blueprint, render_template, jsonify, request, current_app
from .auth import login_required
from loguru import logger
# 创建联系人管理蓝图
contacts_bp = Blueprint('contacts', __name__, url_prefix='/contacts')
# 联系人管理页面
@contacts_bp.route('/')
@login_required
def contacts_management():
"""通讯录管理页面"""
return render_template('contacts_management.html')
# API路由
@contacts_bp.route('/api/all', methods=['GET'])
@login_required
def api_contacts_all():
"""获取所有联系人信息API"""
try:
server = current_app.dashboard_server
contacts = server.contact_manager.get_contacts()
return jsonify({
"success": True,
"data": {
"contacts": contacts
}
})
except Exception as e:
logger.error(f"获取所有联系人信息失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@contacts_bp.route('/api/statistics', methods=['GET'])
@login_required
def api_contacts_statistics():
"""获取联系人统计信息API"""
try:
server = current_app.dashboard_server
# 使用新的联系人分类方法获取统计信息
total, groups, personal, public, official = server.contact_manager.get_contact_statistics()
return jsonify({
"success": True,
"data": {
"total": total,
"groups": groups,
"personal": personal,
"public": public,
"official": official
}
})
except Exception as e:
logger.error(f"获取联系人统计信息失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@contacts_bp.route('/api/groups', methods=['GET'])
@login_required
def api_contacts_groups():
"""获取群组联系人信息API"""
try:
server = current_app.dashboard_server
group_contacts = server.contact_manager.get_group_contacts()
return jsonify({
"success": True,
"data": {
"groups": group_contacts
}
})
except Exception as e:
logger.error(f"获取群组联系人信息失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@contacts_bp.route('/api/personal', methods=['GET'])
@login_required
def api_contacts_personal():
"""获取个人联系人信息API"""
try:
server = current_app.dashboard_server
personal_contacts = server.contact_manager.get_personal_contacts()
return jsonify({
"success": True,
"data": {
"personal": personal_contacts
}
})
except Exception as e:
logger.error(f"获取个人联系人信息失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@contacts_bp.route('/api/official', methods=['GET'])
@login_required
def api_contacts_official():
"""获取公众号联系人信息API"""
try:
server = current_app.dashboard_server
official_accounts = server.contact_manager.get_official_accounts()
return jsonify({
"success": True,
"data": {
"official": official_accounts
}
})
except Exception as e:
logger.error(f"获取公众号联系人信息失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@contacts_bp.route('/api/public', methods=['GET'])
@login_required
def api_contacts_public():
"""获取公共好友信息API"""
try:
server = current_app.dashboard_server
public_contacts = server.contact_manager.get_public_contacts()
return jsonify({
"success": True,
"data": {
"public": public_contacts
}
})
except Exception as e:
logger.error(f"获取公共好友信息失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@contacts_bp.route('/api/head_images', methods=['GET'])
@login_required
def api_head_images():
"""获取联系人头像信息API"""
try:
server = current_app.dashboard_server
head_images = server.contact_manager.get_all_head_images()
return jsonify({
"success": True,
"data": {
"head_images": head_images
}
})
except Exception as e:
logger.error(f"获取联系人头像信息失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@contacts_bp.route('/api/group_members/<roomid>', methods=['GET'])
@login_required
def api_group_members(roomid):
"""获取指定群的成员列表API
Args:
roomid: 群ID
"""
try:
server = current_app.dashboard_server
group_members = server.contact_db.get_chatroom_member_list(roomid)
return jsonify({
"success": True,
"data": {
"members": group_members
}
})
except Exception as e:
logger.error(f"获取群成员列表失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@contacts_bp.route('/api/update', methods=['POST'])
@login_required
def api_contacts_update():
"""更新通讯录信息API"""
try:
server = current_app.dashboard_server
# 假设 contact_manager 有 update_contacts 方法用于同步通讯录
result = asyncio.run(server.robot.refresh_contacts_db())
if result:
return jsonify({"success": True, "message": "通讯录更新成功"})
else:
return jsonify({"success": False, "message": "通讯录更新失败"}), 500
except Exception as e:
logger.error(f"更新通讯录失败: {e}")
return jsonify({"success": False, "message": f"更新通讯录失败: {str(e)}"}), 500