Files
abot/db/contacts_db.py
liuwei 23b5d5bef0 优化群管理详情页展示与群成员统计口径
1. 群成员列表仅展示仍在群内的成员,排除已退群用户,避免僵尸成员与成员明细口径不一致。

2. 群成员列表按最后发言时间倒序排序,最近活跃成员优先展示,未发言成员排在后面。

3. 群详情启用功能区域的最后消息改为紧凑预览,图片、视频、链接、表情、XML、系统消息统一显示标记,不再直接展示原始内容。

4. 群功能权限区域默认折叠,需手动展开后再查看和操作,降低详情弹窗的信息噪音。

5. 进群欢迎配置区域默认折叠,需手动展开后再查看和编辑群级差异化欢迎配置。
2026-04-30 14:12:01 +08:00

1012 lines
44 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
微信联系人数据库操作类
用于管理微信联系人信息的存储和查询
"""
from datetime import datetime
from loguru import logger
import json
from typing import List, Dict, Optional, Union, Any
from db.base import BaseDBOperator
from db.connection import DBConnectionManager
class ContactsDBOperator(BaseDBOperator):
"""微信联系人数据库操作类"""
def __init__(self, db_manager: DBConnectionManager):
super().__init__(db_manager)
self.LOG = logger
# 缓存群公告字段是否存在,避免反复查询 information_schema。
self._chatroom_announcement_column_ready: Optional[bool] = None
def _column_exists(self, table_name: str, column_name: str) -> bool:
"""检查当前数据库中指定表字段是否存在。"""
try:
sql = """
SELECT COUNT(1) AS cnt
FROM information_schema.COLUMNS
WHERE TABLE_SCHEMA = DATABASE()
AND TABLE_NAME = %s
AND COLUMN_NAME = %s
"""
result = self.execute_query(sql, (table_name, column_name), fetch_one=True) or {}
return int(result.get("cnt") or 0) > 0
except Exception as e:
self.LOG.error(f"检查字段存在性失败: {table_name}.{column_name}, error={e}")
return False
def _ensure_chatroom_announcement_column(self) -> bool:
"""确保群公告字段存在;老库不支持 IF NOT EXISTS 时也可兼容。"""
if self._chatroom_announcement_column_ready is True:
return True
if self._chatroom_announcement_column_ready is False:
return False
# 先探测:存在则直接标记成功。
if self._column_exists("t_chatrooms", "chat_room_announcement"):
self._chatroom_announcement_column_ready = True
return True
# 再补列:不使用 IF NOT EXISTS兼容较低版本 MySQL。
alter_sql = """
ALTER TABLE t_chatrooms
ADD COLUMN chat_room_announcement TEXT COMMENT '群公告内容'
"""
ok = self.execute_update(alter_sql)
if ok or self._column_exists("t_chatrooms", "chat_room_announcement"):
self._chatroom_announcement_column_ready = True
self.LOG.info("已确认 t_chatrooms.chat_room_announcement 字段可用")
return True
# 失败时降级,后续查询将不再引用该字段,避免 Unknown column。
self._chatroom_announcement_column_ready = False
self.LOG.warning("t_chatrooms.chat_room_announcement 字段不可用,将按无公告字段模式运行")
return False
def _ensure_table_exists(self):
"""确保联系人表存在"""
try:
# 创建联系人表
self.execute_update("""
CREATE TABLE IF NOT EXISTS t_wechat_contacts (
id INT AUTO_INCREMENT PRIMARY KEY,
user_name VARCHAR(64) NOT NULL COMMENT '微信ID',
nick_name VARCHAR(128) COMMENT '昵称',
py_initial VARCHAR(128) COMMENT '拼音首字母',
quan_pin VARCHAR(256) COMMENT '全拼',
sex TINYINT COMMENT '性别1男2女0未知',
remark VARCHAR(128) COMMENT '备注',
remark_py_initial VARCHAR(128) COMMENT '备注拼音首字母',
remark_quan_pin VARCHAR(256) COMMENT '备注全拼',
signature TEXT COMMENT '个性签名',
alias VARCHAR(128) COMMENT '微信号',
sns_bg_img TEXT COMMENT '朋友圈背景图',
country VARCHAR(64) COMMENT '国家',
province VARCHAR(64) COMMENT '省份',
city VARCHAR(64) COMMENT '城市',
big_head_img_url TEXT COMMENT '大头像URL',
small_head_img_url TEXT COMMENT '小头像URL',
description TEXT COMMENT '描述',
card_img_url TEXT COMMENT '名片图片URL',
label_list TEXT COMMENT '标签列表',
phone_num_list TEXT COMMENT '电话号码列表',
type ENUM('friends', 'chatrooms', 'ghs') NOT NULL COMMENT '联系人类型:好友、群聊、公众号',
create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
UNIQUE KEY `idx_user_name` (`user_name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='微信联系人信息表';
""")
# 创建群成员表 - 增加了更多字段以支持详细信息
self.execute_update("""
CREATE TABLE IF NOT EXISTS t_chatroom_member (
id INT AUTO_INCREMENT PRIMARY KEY,
chatroom_id VARCHAR(64) NOT NULL COMMENT '群聊ID',
wxid VARCHAR(64) NOT NULL COMMENT '成员微信ID',
nick_name VARCHAR(128) COMMENT '成员昵称',
display_name VARCHAR(128) COMMENT '群内显示名称',
inviter_user_name VARCHAR(64) COMMENT '邀请人微信ID',
member_flag INT COMMENT '成员标志2049表示管理员',
big_head_img_url TEXT COMMENT '大头像URL',
small_head_img_url TEXT COMMENT '小头像URL',
is_owner TINYINT(1) DEFAULT 0 COMMENT '是否群主0否1是',
is_admin TINYINT(1) DEFAULT 0 COMMENT '是否管理员0否1是',
sex TINYINT COMMENT '性别1男2女0未知',
signature TEXT COMMENT '个性签名',
alias VARCHAR(128) COMMENT '微信号',
country VARCHAR(64) COMMENT '国家',
province VARCHAR(64) COMMENT '省份',
city VARCHAR(64) COMMENT '城市',
label_list TEXT COMMENT '标签列表',
phone_num_list TEXT COMMENT '电话号码列表',
py_initial VARCHAR(128) COMMENT '拼音首字母',
quan_pin VARCHAR(256) COMMENT '全拼',
remark_py_initial VARCHAR(128) COMMENT '备注拼音首字母',
remark_quan_pin VARCHAR(256) COMMENT '备注全拼',
create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
latest_active_time DATETIME NULL COMMENT '最后活跃时间',
status TINYINT DEFAULT 1 COMMENT '1-在群里2-已退群',
UNIQUE KEY `idx_chatroom_member` (`chatroom_id`, `wxid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='微信群成员信息表';
""")
self.execute_update("""
CREATE TABLE IF NOT EXISTS t_chatrooms (
id INT AUTO_INCREMENT PRIMARY KEY,
chatroom_id VARCHAR(64) NOT NULL COMMENT '群聊ID',
nick_name VARCHAR(128) COMMENT '群昵称',
py_initial VARCHAR(128) COMMENT '群昵称拼音首字母',
quan_pin VARCHAR(256) COMMENT '群昵称全拼',
sex TINYINT COMMENT '性别',
remark VARCHAR(128) COMMENT '备注',
remark_py_initial VARCHAR(128) COMMENT '备注拼音首字母',
remark_quan_pin VARCHAR(256) COMMENT '备注全拼',
chat_room_notify TINYINT COMMENT '群通知',
chat_room_owner VARCHAR(64) COMMENT '群主微信ID',
chat_room_announcement TEXT COMMENT '群公告内容',
small_head_img_url TEXT COMMENT '群头像URL',
member_list TEXT COMMENT '成员列表(JSON)',
create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
UNIQUE KEY `idx_chatroom_id` (`chatroom_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='微信群信息表';
""")
# 兼容历史库:启动时补齐群公告字段;失败时后续逻辑自动降级兼容。
self._ensure_chatroom_announcement_column()
self.LOG.info("成功创建或确认微信群信息表存在")
except Exception as e:
self.LOG.error(f"创建微信联系人表或群成员表失败: {e}")
raise
def save_contacts(self, contacts_data: List[Dict], contact_type: str, only_insert: bool = False) -> bool:
"""保存联系人信息到数据库
Args:
contacts_data: 联系人数据列表
contact_type: 联系人类型,可选值:'friends', 'chatrooms', 'ghs'
only_insert: 为True时仅写入不存在的联系人已存在则跳过
Returns:
bool: 是否成功保存
"""
if not contacts_data:
self.LOG.warning(f"没有{contact_type}类型的联系人数据需要保存")
return True
try:
for contact in contacts_data:
# 兼容微信协议风格的数据结构
def get_str(field, default=""):
val = contact.get(field, default)
if isinstance(val, dict):
return val.get("string", default)
return val if val is not None else default
data = {
'user_name': get_str('UserName'),
'nick_name': get_str('NickName'),
'py_initial': get_str('Pyinitial'),
'quan_pin': get_str('QuanPin'),
'sex': contact.get('Sex', 0),
'remark': get_str('Remark'),
'remark_py_initial': get_str('RemarkPyinitial'),
'remark_quan_pin': get_str('RemarkQuanPin'),
'signature': contact.get('Signature', ''),
'alias': contact.get('Alias', ''),
'sns_bg_img': contact.get('SnsUserInfo', {}).get('SnsBgimgId', ''),
'country': contact.get('Country', ''),
'province': contact.get('Province', ''),
'city': contact.get('City', ''),
'big_head_img_url': contact.get('BigHeadImgUrl', ''),
'small_head_img_url': contact.get('SmallHeadImgUrl', ''),
'description': '', # 可根据需要补充
'card_img_url': '', # 可根据需要补充
'label_list': '', # 可根据需要补充
'phone_num_list': '', # 可根据需要补充
'type': contact_type
}
# 构建SQL语句
fields = ', '.join(data.keys())
placeholders = ', '.join(['%s'] * len(data))
values = tuple(data.values())
if only_insert:
sql = f"""
INSERT IGNORE INTO t_wechat_contacts ({fields})
VALUES ({placeholders})
"""
else:
update_clause = ', '.join([f"{k}=VALUES({k})" for k in data.keys() if k != 'user_name'])
sql = f"""
INSERT INTO t_wechat_contacts ({fields})
VALUES ({placeholders})
ON DUPLICATE KEY UPDATE {update_clause}
"""
self.execute_update(sql, values)
self.LOG.info(f"成功保存{len(contacts_data)}{contact_type}类型的联系人")
return True
except Exception as e:
self.LOG.error(f"保存{contact_type}类型的联系人失败: {e}")
return False
def save_simple_contacts(self, contact_list: List[str], contact_type: str) -> bool:
"""保存简单联系人列表只有user_name到数据库
Args:
contact_list: 联系人ID列表
contact_type: 联系人类型,可选值:'friends', 'chatrooms', 'ghs'
Returns:
bool: 是否成功保存
"""
if not contact_list:
self.LOG.warning(f"没有{contact_type}类型的联系人数据需要保存")
return True
try:
for user_name in contact_list:
# 构建SQL语句
sql = """
INSERT INTO t_wechat_contacts (user_name, type)
VALUES (%s, %s)
ON DUPLICATE KEY UPDATE type = VALUES(type), update_time = CURRENT_TIMESTAMP
"""
self.execute_update(sql, (user_name, contact_type))
self.LOG.info(f"成功保存{len(contact_list)}{contact_type}类型的简单联系人")
return True
except Exception as e:
self.LOG.error(f"保存{contact_type}类型的简单联系人失败: {e}")
return False
def get_contacts_by_type(self, contact_type: str) -> List[Dict]:
"""根据类型获取联系人列表
Args:
contact_type: 联系人类型,可选值:'friends', 'chatrooms', 'ghs'
Returns:
List[Dict]: 联系人列表
"""
try:
sql = """
SELECT * FROM t_wechat_contacts
WHERE type = %s
ORDER BY nick_name
"""
results = self.execute_query(sql, (contact_type,))
return results
except Exception as e:
self.LOG.error(f"获取{contact_type}类型的联系人失败: {e}")
return []
def get_contact_by_user_name(self, user_name: str) -> Optional[Dict]:
"""根据user_name获取联系人信息
Args:
user_name: 联系人ID
Returns:
Optional[Dict]: 联系人信息如果不存在则返回None
"""
try:
sql = """
SELECT * FROM t_wechat_contacts
WHERE user_name = %s
LIMIT 1
"""
result = self.execute_query(sql, (user_name,), fetch_one=True)
return result
except Exception as e:
self.LOG.error(f"获取联系人{user_name}失败: {e}")
return None
def get_display_name(self, user_name: str) -> str:
"""获取联系人的显示名称优先使用备注其次是昵称最后是微信ID
Args:
user_name: 联系人ID
Returns:
str: 显示名称
"""
contact = self.get_contact_by_user_name(user_name)
if not contact:
return user_name
return contact.get('remark') or contact.get('nick_name') or user_name
def get_all_contacts(self) -> Dict[str, str]:
"""获取所有联系人信息
Returns:
Dict[str, str]: 联系人字典,格式为 {"wxid": "NickName"}
"""
try:
sql = """
SELECT user_name, nick_name, remark
FROM t_wechat_contacts
union all
SELECT wxid as user_name, nick_name, display_name as remark
FROM t_chatroom_member
union all
SELECT chatroom_id as user_name, nick_name, remark as remark
FROM t_chatrooms
"""
results = self.execute_query(sql)
contacts_dict = {}
for result in results:
user_name = result.get('user_name')
remark = result.get('remark')
nick_name = result.get('nick_name')
# 优先使用备注其次是昵称最后是微信ID。如果存在remark,则用remark(nick_name)
if remark:
if nick_name and remark != nick_name:
display_name = f"{remark}({nick_name})"
else:
display_name = remark
else:
display_name = nick_name or user_name
contacts_dict[user_name] = display_name
self.LOG.info(f"从数据库获取了 {len(contacts_dict)} 个联系人信息")
return contacts_dict
except Exception as e:
self.LOG.error(f"获取所有联系人信息失败: {e}")
return {}
def save_chatroom_member_simple(self, chatroom_id: str, member_details: List[Dict], only_insert: bool = False) -> bool:
"""
保存群成员简要信息到数据库,兼容不同数据结构
Args:
chatroom_id: 群聊ID
member_details: 群成员信息列表
Returns:
bool: 是否成功保存
"""
if not member_details:
self.LOG.warning(f"{chatroom_id} 没有成员数据需要保存")
return True
try:
for member in member_details:
# 兼容微信协议风格的数据结构
def get_str(field, default=""):
val = member.get(field, default)
if isinstance(val, dict):
return val.get("string", default)
return val if val is not None else default
data = {
'chatroom_id': chatroom_id,
'wxid': get_str('UserName'),
'nick_name': get_str('NickName'),
'display_name': get_str('DisplayName'),
'inviter_user_name': get_str('InviterUserName'),
'member_flag': member.get('MemberFlag', 0),
'big_head_img_url': get_str('BigHeadImgUrl'),
'small_head_img_url': get_str('SmallHeadImgUrl'),
'is_owner': 1 if member.get('IsOwner', False) else 0,
'is_admin': 1 if member.get('IsAdmin', False) else 0,
'sex': member.get('Sex', 0),
'signature': get_str('Signature'),
'alias': get_str('Alias'),
'country': get_str('Country'),
'province': get_str('Province'),
'city': get_str('City'),
'label_list': '', # 可根据需要补充
'phone_num_list': '', # 可根据需要补充
'py_initial': get_str('Pyinitial'),
'quan_pin': get_str('QuanPin'),
'remark_py_initial': get_str('RemarkPyinitial'),
'remark_quan_pin': get_str('RemarkQuanPin'),
'status': 1,
}
fields = ', '.join(data.keys())
placeholders = ', '.join(['%s'] * len(data))
values = tuple(data.values())
if only_insert:
sql = f"""
INSERT IGNORE INTO t_chatroom_member ({fields})
VALUES ({placeholders})
"""
else:
update_clause = ', '.join([f"{k}=VALUES({k})" for k in data.keys() if k not in ('chatroom_id', 'wxid')])
sql = f"""
INSERT INTO t_chatroom_member ({fields})
VALUES ({placeholders})
ON DUPLICATE KEY UPDATE {update_clause}
"""
self.execute_update(sql, values)
self.LOG.info(f"成功保存群 {chatroom_id}{len(member_details)} 个成员信息")
return True
except Exception as e:
self.LOG.error(f"保存群 {chatroom_id} 成员信息失败: {e}")
return False
def save_chatroom_member_detail(self, chatroom_id: str, member_details: List[Dict]) -> bool:
"""保存群成员详细信息到数据库
Args:
chatroom_id: 群聊ID
member_details: 群成员详细信息列表
Returns:
bool: 是否成功保存
"""
if not member_details or not chatroom_id:
self.LOG.warning(f"没有群聊{chatroom_id}的成员详细信息需要保存")
return False
try:
# 获取现有的群成员信息,以便更新而不是替换
existing_members_sql = """
SELECT wxid, is_owner, is_admin FROM t_chatroom_member
WHERE chatroom_id = %s
"""
existing_members_result = self.execute_query(existing_members_sql, (chatroom_id,))
existing_members = {row.get('wxid'): (row.get('is_owner'), row.get('is_admin'))
for row in existing_members_result}
for member in member_details:
wxid = member.get('userName', '')
if not wxid:
continue
# 保留现有的群主和管理员标识
is_owner, is_admin = 0, 0
if wxid in existing_members:
is_owner, is_admin = existing_members[wxid]
# 处理电话号码列表
phone_num_list = member.get('phoneNumList', [])
if phone_num_list:
phone_num_str = json.dumps(phone_num_list)
else:
phone_num_str = ''
# 构建数据
data = {
'chatroom_id': chatroom_id,
'wxid': wxid,
'nick_name': member.get('nickName', ''),
'display_name': member.get('remark', ''), # 使用备注作为群内显示名称
'inviter_user_name': member.get('inviterUserName', ''),
'member_flag': member.get('memberFlag', 0),
'big_head_img_url': member.get('bigHeadImgUrl', ''),
'small_head_img_url': member.get('smallHeadImgUrl', ''),
'is_owner': is_owner,
'is_admin': is_admin,
# 额外的详细信息字段
'sex': member.get('sex', 0),
'signature': member.get('signature', ''),
'alias': member.get('alias', ''),
'country': member.get('country', ''),
'province': member.get('province', ''),
'city': member.get('city', ''),
'label_list': member.get('labelList', ''),
'phone_num_list': phone_num_str,
'py_initial': member.get('pyInitial', ''),
'quan_pin': member.get('quanPin', ''),
'remark_py_initial': member.get('remarkPyInitial', ''),
'remark_quan_pin': member.get('remarkQuanPin', '')
}
# 构建SQL语句 - 使用REPLACE INTO确保更新现有记录
fields = ', '.join(data.keys())
placeholders = ', '.join(['%s'] * len(data))
values = tuple(data.values())
sql = f"""
REPLACE INTO t_chatroom_member ({fields})
VALUES ({placeholders})
"""
self.execute_update(sql, values)
self.LOG.info(f"成功保存群聊{chatroom_id}{len(member_details)}个成员详细信息")
return True
except Exception as e:
self.LOG.error(f"保存群聊{chatroom_id}的成员详细信息失败: {e}")
return False
def process_chatroom_member_detail_response(self, chatroom_id: str, response: Dict) -> bool:
"""处理获取群成员详情的API响应
Args:
chatroom_id: 群聊ID
response: API响应数据
Returns:
bool: 是否成功处理
"""
try:
if response.get('ret') != 200:
self.LOG.error(f"获取群聊{chatroom_id}成员详情失败: {response.get('msg')}")
return False
data = response.get('data', [])
if not data:
self.LOG.warning(f"群聊{chatroom_id}成员详情数据为空")
return False
return self.save_chatroom_member_detail(chatroom_id, data)
except Exception as e:
self.LOG.error(f"处理群聊{chatroom_id}成员详情数据失败: {e}")
return False
def save_chatroom_info(self, chatroom_data: dict) -> bool:
"""保存群信息到数据库,兼容微信协议风格字段"""
try:
def get_str(field, default=""):
val = chatroom_data.get(field, default)
if isinstance(val, dict):
return val.get("string", default)
return val if val is not None else default
# 群公告在不同协议/版本里字段名不一致,这里做一次兼容提取,避免前端拿不到内容。
announcement = (
get_str('ChatRoomAnnouncement')
or get_str('Announcement')
or get_str('Annoucement')
or get_str('AnnouncementContent')
or get_str('chatRoomAnnouncement')
)
data = {
'chatroom_id': get_str('UserName'),
'nick_name': get_str('NickName',"未知群名"),
'py_initial': get_str('Pyinitial'),
'quan_pin': get_str('QuanPin'),
'sex': chatroom_data.get('Sex', 0),
'remark': get_str('Remark'),
'remark_py_initial': get_str('RemarkPyinitial'),
'remark_quan_pin': get_str('RemarkQuanPin'),
'chat_room_notify': chatroom_data.get('ChatRoomNotify', 0),
'chat_room_owner': chatroom_data.get('ChatRoomOwner', ''),
'small_head_img_url': chatroom_data.get('SmallHeadImgUrl', ''),
# 成员列表可选存储为JSON字符串
'member_list': json.dumps(chatroom_data.get('NewChatroomData', {}).get('ChatRoomMember', []),
ensure_ascii=False)
}
# 字段存在才写群公告,避免老库 Unknown column。
if self._ensure_chatroom_announcement_column():
data['chat_room_announcement'] = announcement
fields = ', '.join(data.keys())
placeholders = ', '.join(['%s'] * len(data))
values = tuple(data.values())
update_clause = ', '.join([f"{k}=VALUES({k})" for k in data.keys() if k != 'chatroom_id'])
sql = f"""
INSERT INTO t_chatrooms ({fields})
VALUES ({placeholders})
ON DUPLICATE KEY UPDATE {update_clause}
"""
self.execute_update(sql, values)
self.LOG.info(f"成功保存群聊信息: {data['chatroom_id']}")
return True
except Exception as e:
self.LOG.error(f"保存群聊信息失败: {e}")
return False
def get_chatroom_info(self, chatroom_id: str) -> Optional[dict]:
"""获取群信息"""
try:
sql = "SELECT * FROM t_chatrooms WHERE chatroom_id = %s LIMIT 1"
result = self.execute_query(sql, (chatroom_id,), fetch_one=True)
if result and result.get('member_list'):
result['member_list'] = json.loads(result['member_list'])
return result
except Exception as e:
self.LOG.error(f"获取群聊{chatroom_id}信息失败: {e}")
return None
def update_chatroom_info(self, chatroom_id: str, update_data: dict) -> bool:
"""更新群信息"""
try:
set_clause = ', '.join([f"{k}=%s" for k in update_data.keys()])
values = list(update_data.values())
values.append(chatroom_id)
sql = f"UPDATE t_chatrooms SET {set_clause} WHERE chatroom_id = %s"
self.execute_update(sql, tuple(values))
self.LOG.info(f"成功更新群聊 {chatroom_id} 信息")
return True
except Exception as e:
self.LOG.error(f"更新群聊{chatroom_id}信息失败: {e}")
return False
def mark_chatroom_members_left(self, chatroom_id: str, active_member_wxids: Optional[List[str]] = None) -> bool:
"""将不在当前群成员列表中的用户标记为已退群。"""
try:
active_member_wxids = [wxid for wxid in (active_member_wxids or []) if wxid]
if active_member_wxids:
placeholders = ', '.join(['%s'] * len(active_member_wxids))
sql = f"""
UPDATE t_chatroom_member
SET status = 2
WHERE chatroom_id = %s AND wxid NOT IN ({placeholders})
"""
params = (chatroom_id, *active_member_wxids)
else:
sql = """
UPDATE t_chatroom_member
SET status = 2
WHERE chatroom_id = %s
"""
params = (chatroom_id,)
self.execute_update(sql, params)
self.LOG.info(f"已将群 {chatroom_id} 中缺失成员标记为已退群")
return True
except Exception as e:
self.LOG.error(f"标记群{chatroom_id}成员退群失败: {e}")
return False
def mark_chatroom_members_active(self, chatroom_id: str, active_member_wxids: List[str]) -> bool:
"""将当前仍在群里的成员标记为在群。"""
try:
active_member_wxids = [wxid for wxid in (active_member_wxids or []) if wxid]
if not active_member_wxids:
return True
placeholders = ', '.join(['%s'] * len(active_member_wxids))
sql = f"""
UPDATE t_chatroom_member
SET status = 1
WHERE chatroom_id = %s AND wxid IN ({placeholders})
"""
self.execute_update(sql, (chatroom_id, *active_member_wxids))
return True
except Exception as e:
self.LOG.error(f"恢复群{chatroom_id}成员在群状态失败: {e}")
return False
def delete_chatroom_info(self, chatroom_id: str) -> bool:
"""删除群信息"""
try:
sql = "DELETE FROM t_chatrooms WHERE chatroom_id = %s"
self.execute_update(sql, (chatroom_id,))
self.LOG.info(f"成功删除群聊 {chatroom_id} 信息")
return True
except Exception as e:
self.LOG.error(f"删除群聊{chatroom_id}信息失败: {e}")
return False
def delete_chatroom_members_info(self, chatroom_id: str) -> bool:
"""删除群成员信息"""
try:
sql = "DELETE FROM t_chatroom_member WHERE chatroom_id = %s"
self.execute_update(sql, (chatroom_id,))
self.LOG.info(f"成功删除群聊 {chatroom_id} 的成员信息")
return True
except Exception as e:
self.LOG.error(f"删除群聊{chatroom_id}的成员信息失败: {e}")
return False
# 新增获取群列表接口
def get_chatroom_list(self) -> List[dict]:
"""获取群列表"""
try:
sql = "SELECT * FROM t_chatrooms"
results = self.execute_query(sql)
for result in results:
if result.get('member_list'):
result['member_list'] = json.loads(result['member_list'])
return results
except Exception as e:
self.LOG.error(f"获取群列表失败: {e}")
return []
# 新增获取群成员列表接口
def get_chatroom_member_list(self, chatroom_id: str) -> List[dict]:
"""获取群成员列表"""
try:
sql = "SELECT * FROM t_chatroom_member WHERE chatroom_id = %s"
results = self.execute_query(sql, (chatroom_id,))
return results
except Exception as e:
self.LOG.error(f"获取群{chatroom_id}成员列表失败: {e}")
return []
# 新增获取群成员列表接口
def get_chatroom_small_member_list(self, chatroom_id: str) -> List[dict]:
"""获取群成员列表。
说明:
1. 群详情页的成员列表只展示当前仍在群内的成员,已退群成员不再返回;
2. 成员需要按最后发言时间倒序展示,方便后台优先看到最近活跃的人;
3. 从未发言的成员排在后面,避免把“无活跃记录”的成员顶到前面。
"""
try:
sql = """
SELECT wxid, nick_name, display_name, status, latest_active_time, small_head_img_url
FROM t_chatroom_member
WHERE chatroom_id = %s
AND status = 1
ORDER BY
CASE WHEN latest_active_time IS NULL THEN 1 ELSE 0 END ASC,
latest_active_time DESC,
COALESCE(NULLIF(display_name, ''), nick_name, wxid) ASC
"""
results = self.execute_query(sql, (chatroom_id,))
for row in results:
dt = row.get("latest_active_time")
if isinstance(dt, datetime):
row["latest_active_time"] = dt.strftime("%Y-%m-%d %H:%M:%S")
return results
except Exception as e:
self.LOG.error(f"获取群{chatroom_id}成员列表失败: {e}")
return []
def get_chatroom_profile(self, chatroom_id: str) -> Dict[str, Any]:
"""获取群资料(群公告、群主、管理员、成员数)用于通讯录详情展示。"""
try:
# 公告字段在老库可能不存在:不存在则降级为空串,避免 SQL 报错。
announcement_expr = (
"c.chat_room_announcement"
if self._ensure_chatroom_announcement_column()
else "''"
)
# 先取群基础信息 + 群主展示名 + 成员数。
info_sql = f"""
SELECT
c.chatroom_id,
c.nick_name,
c.chat_room_owner,
{announcement_expr} AS chat_room_announcement,
COALESCE(NULLIF(owner_member.display_name, ''), owner_member.nick_name, c.chat_room_owner, '') AS owner_name,
(
SELECT COUNT(*)
FROM t_chatroom_member m
WHERE m.chatroom_id = c.chatroom_id AND m.status = 1
) AS member_count
FROM t_chatrooms c
LEFT JOIN t_chatroom_member owner_member
ON owner_member.chatroom_id = c.chatroom_id
AND owner_member.wxid = c.chat_room_owner
WHERE c.chatroom_id = %s
LIMIT 1
"""
info = self.execute_query(info_sql, (chatroom_id,), fetch_one=True) or {}
# 使用既有身份字段 is_admin 组装管理员列表,直接复用现有数据。
admin_sql = """
SELECT
wxid,
COALESCE(NULLIF(display_name, ''), nick_name, wxid) AS display_name
FROM t_chatroom_member
WHERE chatroom_id = %s AND is_admin = 1 AND status = 1
ORDER BY display_name
"""
admins = self.execute_query(admin_sql, (chatroom_id,)) or []
return {
"chatroom_id": chatroom_id,
"nick_name": info.get("nick_name", ""),
"owner_wxid": info.get("chat_room_owner", "") or "",
"owner_name": info.get("owner_name", "") or "",
"announcement": info.get("chat_room_announcement", "") or "",
"member_count": int(info.get("member_count") or 0),
"admin_count": len(admins),
"admins": admins,
}
except Exception as e:
self.LOG.error(f"获取群{chatroom_id}资料失败: {e}")
return {
"chatroom_id": chatroom_id,
"nick_name": "",
"owner_wxid": "",
"owner_name": "",
"announcement": "",
"member_count": 0,
"admin_count": 0,
"admins": [],
}
# 获取群成员的昵称信息
def get_chatroom_member_list_name_all(self) -> List[dict]:
"""获取群成员列表"""
try:
sql = "SELECT chatroom_id,wxid, COALESCE(NULLIF(display_name,''), nick_name, wxid) as nick_name FROM t_chatroom_member"
results = self.execute_query(sql)
return results
except Exception as e:
self.LOG.error(f"获取群成员列表失败: {e}")
return []
# 新增获取群成员信息接口
def get_chatroom_member_info(self, chatroom_id: str, wxid: str) -> Optional[dict]:
"""获取群成员信息"""
try:
sql = "SELECT * FROM t_chatroom_member WHERE chatroom_id = %s AND wxid = %s LIMIT 1"
result = self.execute_query(sql, (chatroom_id, wxid), fetch_one=True)
return result
except Exception as e:
self.LOG.error(f"获取群{chatroom_id}成员{wxid}信息失败: {e}")
return None
# 新增群信息删除功能
def delete_chatroom_all_info(self, chatroom_id: str) -> bool:
"""删除群成员信息"""
try:
sql = "DELETE FROM t_chatroom_member WHERE chatroom_id = %s"
self.execute_update(sql, (chatroom_id,))
self.LOG.info(f"成功删除群{chatroom_id}信息")
sql = "DELETE FROM t_chatrooms WHERE chatroom_id = %s"
self.execute_update(sql, (chatroom_id,))
self.LOG.info(f"成功删除群聊 {chatroom_id} 信息")
return True
except Exception as e:
self.LOG.error(f"删除群{chatroom_id}信息失败: {e}")
return False
# 新增删除所有联系人功能
def delete_all_contacts(self) -> bool:
"""删除所有联系人信息"""
try:
sql = "DELETE FROM t_wechat_contacts"
self.execute_update(sql)
self.LOG.info(f"成功删除所有联系人信息")
return True
except Exception as e:
self.LOG.error(f"删除所有联系人信息失败: {e}")
return False
# 新增获取所有联系人头像信息接口
def get_all_contacts_avatar(self) -> Dict[str, str]:
"""获取所有联系人头像信息"""
try:
sql = """
SELECT user_name, small_head_img_url
FROM t_wechat_contacts
union all
SELECT wxid as user_name, small_head_img_url
FROM t_chatroom_member
union all
SELECT chatroom_id as user_name, small_head_img_url
FROM t_chatrooms
"""
results = self.execute_query(sql)
# 返回DICT
results = {result['user_name']: result['small_head_img_url'] for result in results}
return results
except Exception as e:
self.LOG.error(f"获取所有联系人头像信息失败: {e}")
return []
# 新增更新群成员最后活跃时间接口
def update_chatroom_member_active_time(self, chatroom_id: str, wxid: str) -> bool:
"""更新群成员的最后活跃时间"""
try:
# 只有当成员确实存在时才更新
sql = """
UPDATE t_chatroom_member
SET latest_active_time = CURRENT_TIMESTAMP
WHERE chatroom_id = %s AND wxid = %s
"""
self.execute_update(sql, (chatroom_id, wxid))
return True
except Exception as e:
self.LOG.error(f"更新群{chatroom_id}成员{wxid}活跃时间失败: {e}")
return False
def get_inactive_members_rank(self, chatroom_id: str, days: int = 60, limit: int = 10) -> List[Dict]:
try:
sql = """
SELECT
wxid,
COALESCE(NULLIF(display_name,''), nick_name, wxid) AS nick_name,
latest_active_time,
CASE
WHEN latest_active_time IS NULL THEN 999999
ELSE TIMESTAMPDIFF(DAY, latest_active_time, NOW())
END AS inactivity_days
, CASE WHEN latest_active_time IS NULL THEN 1 ELSE 0 END AS never_spoken
FROM t_chatroom_member
WHERE chatroom_id = %s
AND status = 1
AND (latest_active_time IS NULL OR latest_active_time <= DATE_SUB(NOW(), INTERVAL %s DAY))
ORDER BY inactivity_days DESC
LIMIT %s
"""
results = self.execute_query(sql, (chatroom_id, days, limit))
for row in results:
dt = row.get("latest_active_time")
if isinstance(dt, datetime):
row["latest_active_time"] = dt.strftime("%Y-%m-%d %H:%M:%S")
return results
except Exception as e:
self.LOG.error(f"获取群{chatroom_id}潜水排行失败: {e}")
return []
def get_group_member_summary(self, chatroom_id: str, inactive_days: int = 30) -> Dict[str, Any]:
"""获取群成员概览摘要。
说明:
1. 群运营看板里的成员统计口径应当只基于“当前仍在群内”的成员;
2. 已退群成员如果继续参与统计,会导致僵尸成员数、活跃覆盖率等指标失真;
3. 因此这里统一过滤 status = 1确保详情页数字与成员列表保持一致。
"""
try:
sql = """
SELECT
COUNT(*) AS total_members,
COUNT(*) AS in_group_members,
SUM(CASE WHEN is_owner = 1 THEN 1 ELSE 0 END) AS owner_count,
SUM(CASE WHEN is_admin = 1 THEN 1 ELSE 0 END) AS admin_count,
SUM(CASE WHEN latest_active_time IS NOT NULL THEN 1 ELSE 0 END) AS spoken_members,
SUM(CASE WHEN latest_active_time IS NULL THEN 1 ELSE 0 END) AS never_spoken_members,
SUM(
CASE
WHEN latest_active_time IS NOT NULL
AND latest_active_time >= DATE_SUB(NOW(), INTERVAL 7 DAY)
THEN 1 ELSE 0
END
) AS active_7d_members,
SUM(
CASE
WHEN latest_active_time IS NOT NULL
AND latest_active_time >= DATE_SUB(NOW(), INTERVAL 30 DAY)
THEN 1 ELSE 0
END
) AS active_30d_members,
SUM(
CASE
WHEN latest_active_time IS NULL
OR latest_active_time < DATE_SUB(NOW(), INTERVAL %s DAY)
THEN 1 ELSE 0
END
) AS inactive_members
FROM t_chatroom_member
WHERE chatroom_id = %s
AND status = 1
"""
result = self.execute_query(sql, (inactive_days, chatroom_id), fetch_one=True) or {}
return {
"total_members": int(result.get("total_members") or 0),
"in_group_members": int(result.get("in_group_members") or 0),
"owner_count": int(result.get("owner_count") or 0),
"admin_count": int(result.get("admin_count") or 0),
"spoken_members": int(result.get("spoken_members") or 0),
"never_spoken_members": int(result.get("never_spoken_members") or 0),
"active_7d_members": int(result.get("active_7d_members") or 0),
"active_30d_members": int(result.get("active_30d_members") or 0),
"inactive_members": int(result.get("inactive_members") or 0),
}
except Exception as e:
self.LOG.error(f"获取群{chatroom_id}成员摘要失败: {e}")
return {
"total_members": 0,
"in_group_members": 0,
"owner_count": 0,
"admin_count": 0,
"spoken_members": 0,
"never_spoken_members": 0,
"active_7d_members": 0,
"active_30d_members": 0,
"inactive_members": 0,
}