Files
abot/db/contacts_db.py
liuwei 4cf5a05088 这次实际改的是按钮背后的同步逻辑,不是页面文案本身。现在 /contacts/api/update 触发后会:
个人联系人、公众号:已存在就跳过,不存在才写入。
群:不再删库重建,已有群保留;新群会写入。
群成员:已存在就跳过,不存在就写入。
如果成员这次不在群里了,会把 db/contacts_db.py 里的 status 标成 2,前端会显示“已退群”。
如果整个群查不到了,也不再删除群资料,只把该群历史成员标记为“已退群”。
改动在:

robot.py
db/contacts_db.py
我还做了语法校验,py_compile 通过。需要的话我也可以继续帮你把“更新通讯录”按钮的成功提示改成更明确,比如“已完成增量同步,未删除历史数据”。
2026-04-15 09:28:44 +08:00

863 lines
37 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
微信联系人数据库操作类
用于管理微信联系人信息的存储和查询
"""
from datetime import datetime
from loguru import logger
import json
from typing import List, Dict, Optional, Union, Any
from db.base import BaseDBOperator
from db.connection import DBConnectionManager
class ContactsDBOperator(BaseDBOperator):
"""微信联系人数据库操作类"""
def __init__(self, db_manager: DBConnectionManager):
super().__init__(db_manager)
self.LOG = logger
def _ensure_table_exists(self):
"""确保联系人表存在"""
try:
# 创建联系人表
self.execute_update("""
CREATE TABLE IF NOT EXISTS t_wechat_contacts (
id INT AUTO_INCREMENT PRIMARY KEY,
user_name VARCHAR(64) NOT NULL COMMENT '微信ID',
nick_name VARCHAR(128) COMMENT '昵称',
py_initial VARCHAR(128) COMMENT '拼音首字母',
quan_pin VARCHAR(256) COMMENT '全拼',
sex TINYINT COMMENT '性别1男2女0未知',
remark VARCHAR(128) COMMENT '备注',
remark_py_initial VARCHAR(128) COMMENT '备注拼音首字母',
remark_quan_pin VARCHAR(256) COMMENT '备注全拼',
signature TEXT COMMENT '个性签名',
alias VARCHAR(128) COMMENT '微信号',
sns_bg_img TEXT COMMENT '朋友圈背景图',
country VARCHAR(64) COMMENT '国家',
province VARCHAR(64) COMMENT '省份',
city VARCHAR(64) COMMENT '城市',
big_head_img_url TEXT COMMENT '大头像URL',
small_head_img_url TEXT COMMENT '小头像URL',
description TEXT COMMENT '描述',
card_img_url TEXT COMMENT '名片图片URL',
label_list TEXT COMMENT '标签列表',
phone_num_list TEXT COMMENT '电话号码列表',
type ENUM('friends', 'chatrooms', 'ghs') NOT NULL COMMENT '联系人类型:好友、群聊、公众号',
create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
UNIQUE KEY `idx_user_name` (`user_name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='微信联系人信息表';
""")
# 创建群成员表 - 增加了更多字段以支持详细信息
self.execute_update("""
CREATE TABLE IF NOT EXISTS t_chatroom_member (
id INT AUTO_INCREMENT PRIMARY KEY,
chatroom_id VARCHAR(64) NOT NULL COMMENT '群聊ID',
wxid VARCHAR(64) NOT NULL COMMENT '成员微信ID',
nick_name VARCHAR(128) COMMENT '成员昵称',
display_name VARCHAR(128) COMMENT '群内显示名称',
inviter_user_name VARCHAR(64) COMMENT '邀请人微信ID',
member_flag INT COMMENT '成员标志2049表示管理员',
big_head_img_url TEXT COMMENT '大头像URL',
small_head_img_url TEXT COMMENT '小头像URL',
is_owner TINYINT(1) DEFAULT 0 COMMENT '是否群主0否1是',
is_admin TINYINT(1) DEFAULT 0 COMMENT '是否管理员0否1是',
sex TINYINT COMMENT '性别1男2女0未知',
signature TEXT COMMENT '个性签名',
alias VARCHAR(128) COMMENT '微信号',
country VARCHAR(64) COMMENT '国家',
province VARCHAR(64) COMMENT '省份',
city VARCHAR(64) COMMENT '城市',
label_list TEXT COMMENT '标签列表',
phone_num_list TEXT COMMENT '电话号码列表',
py_initial VARCHAR(128) COMMENT '拼音首字母',
quan_pin VARCHAR(256) COMMENT '全拼',
remark_py_initial VARCHAR(128) COMMENT '备注拼音首字母',
remark_quan_pin VARCHAR(256) COMMENT '备注全拼',
create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
latest_active_time DATETIME NULL COMMENT '最后活跃时间',
status TINYINT DEFAULT 1 COMMENT '1-在群里2-已退群',
UNIQUE KEY `idx_chatroom_member` (`chatroom_id`, `wxid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='微信群成员信息表';
""")
self.execute_update("""
CREATE TABLE IF NOT EXISTS t_chatrooms (
id INT AUTO_INCREMENT PRIMARY KEY,
chatroom_id VARCHAR(64) NOT NULL COMMENT '群聊ID',
nick_name VARCHAR(128) COMMENT '群昵称',
py_initial VARCHAR(128) COMMENT '群昵称拼音首字母',
quan_pin VARCHAR(256) COMMENT '群昵称全拼',
sex TINYINT COMMENT '性别',
remark VARCHAR(128) COMMENT '备注',
remark_py_initial VARCHAR(128) COMMENT '备注拼音首字母',
remark_quan_pin VARCHAR(256) COMMENT '备注全拼',
chat_room_notify TINYINT COMMENT '群通知',
chat_room_owner VARCHAR(64) COMMENT '群主微信ID',
small_head_img_url TEXT COMMENT '群头像URL',
member_list TEXT COMMENT '成员列表(JSON)',
create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
UNIQUE KEY `idx_chatroom_id` (`chatroom_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='微信群信息表';
""")
self.LOG.info("成功创建或确认微信群信息表存在")
except Exception as e:
self.LOG.error(f"创建微信联系人表或群成员表失败: {e}")
raise
def save_contacts(self, contacts_data: List[Dict], contact_type: str, only_insert: bool = False) -> bool:
"""保存联系人信息到数据库
Args:
contacts_data: 联系人数据列表
contact_type: 联系人类型,可选值:'friends', 'chatrooms', 'ghs'
only_insert: 为True时仅写入不存在的联系人已存在则跳过
Returns:
bool: 是否成功保存
"""
if not contacts_data:
self.LOG.warning(f"没有{contact_type}类型的联系人数据需要保存")
return True
try:
for contact in contacts_data:
# 兼容微信协议风格的数据结构
def get_str(field, default=""):
val = contact.get(field, default)
if isinstance(val, dict):
return val.get("string", default)
return val if val is not None else default
data = {
'user_name': get_str('UserName'),
'nick_name': get_str('NickName'),
'py_initial': get_str('Pyinitial'),
'quan_pin': get_str('QuanPin'),
'sex': contact.get('Sex', 0),
'remark': get_str('Remark'),
'remark_py_initial': get_str('RemarkPyinitial'),
'remark_quan_pin': get_str('RemarkQuanPin'),
'signature': contact.get('Signature', ''),
'alias': contact.get('Alias', ''),
'sns_bg_img': contact.get('SnsUserInfo', {}).get('SnsBgimgId', ''),
'country': contact.get('Country', ''),
'province': contact.get('Province', ''),
'city': contact.get('City', ''),
'big_head_img_url': contact.get('BigHeadImgUrl', ''),
'small_head_img_url': contact.get('SmallHeadImgUrl', ''),
'description': '', # 可根据需要补充
'card_img_url': '', # 可根据需要补充
'label_list': '', # 可根据需要补充
'phone_num_list': '', # 可根据需要补充
'type': contact_type
}
# 构建SQL语句
fields = ', '.join(data.keys())
placeholders = ', '.join(['%s'] * len(data))
values = tuple(data.values())
if only_insert:
sql = f"""
INSERT IGNORE INTO t_wechat_contacts ({fields})
VALUES ({placeholders})
"""
else:
update_clause = ', '.join([f"{k}=VALUES({k})" for k in data.keys() if k != 'user_name'])
sql = f"""
INSERT INTO t_wechat_contacts ({fields})
VALUES ({placeholders})
ON DUPLICATE KEY UPDATE {update_clause}
"""
self.execute_update(sql, values)
self.LOG.info(f"成功保存{len(contacts_data)}{contact_type}类型的联系人")
return True
except Exception as e:
self.LOG.error(f"保存{contact_type}类型的联系人失败: {e}")
return False
def save_simple_contacts(self, contact_list: List[str], contact_type: str) -> bool:
"""保存简单联系人列表只有user_name到数据库
Args:
contact_list: 联系人ID列表
contact_type: 联系人类型,可选值:'friends', 'chatrooms', 'ghs'
Returns:
bool: 是否成功保存
"""
if not contact_list:
self.LOG.warning(f"没有{contact_type}类型的联系人数据需要保存")
return True
try:
for user_name in contact_list:
# 构建SQL语句
sql = """
INSERT INTO t_wechat_contacts (user_name, type)
VALUES (%s, %s)
ON DUPLICATE KEY UPDATE type = VALUES(type), update_time = CURRENT_TIMESTAMP
"""
self.execute_update(sql, (user_name, contact_type))
self.LOG.info(f"成功保存{len(contact_list)}{contact_type}类型的简单联系人")
return True
except Exception as e:
self.LOG.error(f"保存{contact_type}类型的简单联系人失败: {e}")
return False
def get_contacts_by_type(self, contact_type: str) -> List[Dict]:
"""根据类型获取联系人列表
Args:
contact_type: 联系人类型,可选值:'friends', 'chatrooms', 'ghs'
Returns:
List[Dict]: 联系人列表
"""
try:
sql = """
SELECT * FROM t_wechat_contacts
WHERE type = %s
ORDER BY nick_name
"""
results = self.execute_query(sql, (contact_type,))
return results
except Exception as e:
self.LOG.error(f"获取{contact_type}类型的联系人失败: {e}")
return []
def get_contact_by_user_name(self, user_name: str) -> Optional[Dict]:
"""根据user_name获取联系人信息
Args:
user_name: 联系人ID
Returns:
Optional[Dict]: 联系人信息如果不存在则返回None
"""
try:
sql = """
SELECT * FROM t_wechat_contacts
WHERE user_name = %s
LIMIT 1
"""
result = self.execute_query(sql, (user_name,), fetch_one=True)
return result
except Exception as e:
self.LOG.error(f"获取联系人{user_name}失败: {e}")
return None
def get_display_name(self, user_name: str) -> str:
"""获取联系人的显示名称优先使用备注其次是昵称最后是微信ID
Args:
user_name: 联系人ID
Returns:
str: 显示名称
"""
contact = self.get_contact_by_user_name(user_name)
if not contact:
return user_name
return contact.get('remark') or contact.get('nick_name') or user_name
def get_all_contacts(self) -> Dict[str, str]:
"""获取所有联系人信息
Returns:
Dict[str, str]: 联系人字典,格式为 {"wxid": "NickName"}
"""
try:
sql = """
SELECT user_name, nick_name, remark
FROM t_wechat_contacts
union all
SELECT wxid as user_name, nick_name, display_name as remark
FROM t_chatroom_member
union all
SELECT chatroom_id as user_name, nick_name, remark as remark
FROM t_chatrooms
"""
results = self.execute_query(sql)
contacts_dict = {}
for result in results:
user_name = result.get('user_name')
remark = result.get('remark')
nick_name = result.get('nick_name')
# 优先使用备注其次是昵称最后是微信ID。如果存在remark,则用remark(nick_name)
if remark:
if nick_name and remark != nick_name:
display_name = f"{remark}({nick_name})"
else:
display_name = remark
else:
display_name = nick_name or user_name
contacts_dict[user_name] = display_name
self.LOG.info(f"从数据库获取了 {len(contacts_dict)} 个联系人信息")
return contacts_dict
except Exception as e:
self.LOG.error(f"获取所有联系人信息失败: {e}")
return {}
def save_chatroom_member_simple(self, chatroom_id: str, member_details: List[Dict], only_insert: bool = False) -> bool:
"""
保存群成员简要信息到数据库,兼容不同数据结构
Args:
chatroom_id: 群聊ID
member_details: 群成员信息列表
Returns:
bool: 是否成功保存
"""
if not member_details:
self.LOG.warning(f"{chatroom_id} 没有成员数据需要保存")
return True
try:
for member in member_details:
# 兼容微信协议风格的数据结构
def get_str(field, default=""):
val = member.get(field, default)
if isinstance(val, dict):
return val.get("string", default)
return val if val is not None else default
data = {
'chatroom_id': chatroom_id,
'wxid': get_str('UserName'),
'nick_name': get_str('NickName'),
'display_name': get_str('DisplayName'),
'inviter_user_name': get_str('InviterUserName'),
'member_flag': member.get('MemberFlag', 0),
'big_head_img_url': get_str('BigHeadImgUrl'),
'small_head_img_url': get_str('SmallHeadImgUrl'),
'is_owner': 1 if member.get('IsOwner', False) else 0,
'is_admin': 1 if member.get('IsAdmin', False) else 0,
'sex': member.get('Sex', 0),
'signature': get_str('Signature'),
'alias': get_str('Alias'),
'country': get_str('Country'),
'province': get_str('Province'),
'city': get_str('City'),
'label_list': '', # 可根据需要补充
'phone_num_list': '', # 可根据需要补充
'py_initial': get_str('Pyinitial'),
'quan_pin': get_str('QuanPin'),
'remark_py_initial': get_str('RemarkPyinitial'),
'remark_quan_pin': get_str('RemarkQuanPin'),
'status': 1,
}
fields = ', '.join(data.keys())
placeholders = ', '.join(['%s'] * len(data))
values = tuple(data.values())
if only_insert:
sql = f"""
INSERT IGNORE INTO t_chatroom_member ({fields})
VALUES ({placeholders})
"""
else:
update_clause = ', '.join([f"{k}=VALUES({k})" for k in data.keys() if k not in ('chatroom_id', 'wxid')])
sql = f"""
INSERT INTO t_chatroom_member ({fields})
VALUES ({placeholders})
ON DUPLICATE KEY UPDATE {update_clause}
"""
self.execute_update(sql, values)
self.LOG.info(f"成功保存群 {chatroom_id}{len(member_details)} 个成员信息")
return True
except Exception as e:
self.LOG.error(f"保存群 {chatroom_id} 成员信息失败: {e}")
return False
def save_chatroom_member_detail(self, chatroom_id: str, member_details: List[Dict]) -> bool:
"""保存群成员详细信息到数据库
Args:
chatroom_id: 群聊ID
member_details: 群成员详细信息列表
Returns:
bool: 是否成功保存
"""
if not member_details or not chatroom_id:
self.LOG.warning(f"没有群聊{chatroom_id}的成员详细信息需要保存")
return False
try:
# 获取现有的群成员信息,以便更新而不是替换
existing_members_sql = """
SELECT wxid, is_owner, is_admin FROM t_chatroom_member
WHERE chatroom_id = %s
"""
existing_members_result = self.execute_query(existing_members_sql, (chatroom_id,))
existing_members = {row.get('wxid'): (row.get('is_owner'), row.get('is_admin'))
for row in existing_members_result}
for member in member_details:
wxid = member.get('userName', '')
if not wxid:
continue
# 保留现有的群主和管理员标识
is_owner, is_admin = 0, 0
if wxid in existing_members:
is_owner, is_admin = existing_members[wxid]
# 处理电话号码列表
phone_num_list = member.get('phoneNumList', [])
if phone_num_list:
phone_num_str = json.dumps(phone_num_list)
else:
phone_num_str = ''
# 构建数据
data = {
'chatroom_id': chatroom_id,
'wxid': wxid,
'nick_name': member.get('nickName', ''),
'display_name': member.get('remark', ''), # 使用备注作为群内显示名称
'inviter_user_name': member.get('inviterUserName', ''),
'member_flag': member.get('memberFlag', 0),
'big_head_img_url': member.get('bigHeadImgUrl', ''),
'small_head_img_url': member.get('smallHeadImgUrl', ''),
'is_owner': is_owner,
'is_admin': is_admin,
# 额外的详细信息字段
'sex': member.get('sex', 0),
'signature': member.get('signature', ''),
'alias': member.get('alias', ''),
'country': member.get('country', ''),
'province': member.get('province', ''),
'city': member.get('city', ''),
'label_list': member.get('labelList', ''),
'phone_num_list': phone_num_str,
'py_initial': member.get('pyInitial', ''),
'quan_pin': member.get('quanPin', ''),
'remark_py_initial': member.get('remarkPyInitial', ''),
'remark_quan_pin': member.get('remarkQuanPin', '')
}
# 构建SQL语句 - 使用REPLACE INTO确保更新现有记录
fields = ', '.join(data.keys())
placeholders = ', '.join(['%s'] * len(data))
values = tuple(data.values())
sql = f"""
REPLACE INTO t_chatroom_member ({fields})
VALUES ({placeholders})
"""
self.execute_update(sql, values)
self.LOG.info(f"成功保存群聊{chatroom_id}{len(member_details)}个成员详细信息")
return True
except Exception as e:
self.LOG.error(f"保存群聊{chatroom_id}的成员详细信息失败: {e}")
return False
def process_chatroom_member_detail_response(self, chatroom_id: str, response: Dict) -> bool:
"""处理获取群成员详情的API响应
Args:
chatroom_id: 群聊ID
response: API响应数据
Returns:
bool: 是否成功处理
"""
try:
if response.get('ret') != 200:
self.LOG.error(f"获取群聊{chatroom_id}成员详情失败: {response.get('msg')}")
return False
data = response.get('data', [])
if not data:
self.LOG.warning(f"群聊{chatroom_id}成员详情数据为空")
return False
return self.save_chatroom_member_detail(chatroom_id, data)
except Exception as e:
self.LOG.error(f"处理群聊{chatroom_id}成员详情数据失败: {e}")
return False
def save_chatroom_info(self, chatroom_data: dict) -> bool:
"""保存群信息到数据库,兼容微信协议风格字段"""
try:
def get_str(field, default=""):
val = chatroom_data.get(field, default)
if isinstance(val, dict):
return val.get("string", default)
return val if val is not None else default
data = {
'chatroom_id': get_str('UserName'),
'nick_name': get_str('NickName',"未知群名"),
'py_initial': get_str('Pyinitial'),
'quan_pin': get_str('QuanPin'),
'sex': chatroom_data.get('Sex', 0),
'remark': get_str('Remark'),
'remark_py_initial': get_str('RemarkPyinitial'),
'remark_quan_pin': get_str('RemarkQuanPin'),
'chat_room_notify': chatroom_data.get('ChatRoomNotify', 0),
'chat_room_owner': chatroom_data.get('ChatRoomOwner', ''),
'small_head_img_url': chatroom_data.get('SmallHeadImgUrl', ''),
# 成员列表可选存储为JSON字符串
'member_list': json.dumps(chatroom_data.get('NewChatroomData', {}).get('ChatRoomMember', []),
ensure_ascii=False)
}
fields = ', '.join(data.keys())
placeholders = ', '.join(['%s'] * len(data))
values = tuple(data.values())
update_clause = ', '.join([f"{k}=VALUES({k})" for k in data.keys() if k != 'chatroom_id'])
sql = f"""
INSERT INTO t_chatrooms ({fields})
VALUES ({placeholders})
ON DUPLICATE KEY UPDATE {update_clause}
"""
self.execute_update(sql, values)
self.LOG.info(f"成功保存群聊信息: {data['chatroom_id']}")
return True
except Exception as e:
self.LOG.error(f"保存群聊信息失败: {e}")
return False
def get_chatroom_info(self, chatroom_id: str) -> Optional[dict]:
"""获取群信息"""
try:
sql = "SELECT * FROM t_chatrooms WHERE chatroom_id = %s LIMIT 1"
result = self.execute_query(sql, (chatroom_id,), fetch_one=True)
if result and result.get('member_list'):
result['member_list'] = json.loads(result['member_list'])
return result
except Exception as e:
self.LOG.error(f"获取群聊{chatroom_id}信息失败: {e}")
return None
def update_chatroom_info(self, chatroom_id: str, update_data: dict) -> bool:
"""更新群信息"""
try:
set_clause = ', '.join([f"{k}=%s" for k in update_data.keys()])
values = list(update_data.values())
values.append(chatroom_id)
sql = f"UPDATE t_chatrooms SET {set_clause} WHERE chatroom_id = %s"
self.execute_update(sql, tuple(values))
self.LOG.info(f"成功更新群聊 {chatroom_id} 信息")
return True
except Exception as e:
self.LOG.error(f"更新群聊{chatroom_id}信息失败: {e}")
return False
def mark_chatroom_members_left(self, chatroom_id: str, active_member_wxids: Optional[List[str]] = None) -> bool:
"""将不在当前群成员列表中的用户标记为已退群。"""
try:
active_member_wxids = [wxid for wxid in (active_member_wxids or []) if wxid]
if active_member_wxids:
placeholders = ', '.join(['%s'] * len(active_member_wxids))
sql = f"""
UPDATE t_chatroom_member
SET status = 2
WHERE chatroom_id = %s AND wxid NOT IN ({placeholders})
"""
params = (chatroom_id, *active_member_wxids)
else:
sql = """
UPDATE t_chatroom_member
SET status = 2
WHERE chatroom_id = %s
"""
params = (chatroom_id,)
self.execute_update(sql, params)
self.LOG.info(f"已将群 {chatroom_id} 中缺失成员标记为已退群")
return True
except Exception as e:
self.LOG.error(f"标记群{chatroom_id}成员退群失败: {e}")
return False
def mark_chatroom_members_active(self, chatroom_id: str, active_member_wxids: List[str]) -> bool:
"""将当前仍在群里的成员标记为在群。"""
try:
active_member_wxids = [wxid for wxid in (active_member_wxids or []) if wxid]
if not active_member_wxids:
return True
placeholders = ', '.join(['%s'] * len(active_member_wxids))
sql = f"""
UPDATE t_chatroom_member
SET status = 1
WHERE chatroom_id = %s AND wxid IN ({placeholders})
"""
self.execute_update(sql, (chatroom_id, *active_member_wxids))
return True
except Exception as e:
self.LOG.error(f"恢复群{chatroom_id}成员在群状态失败: {e}")
return False
def delete_chatroom_info(self, chatroom_id: str) -> bool:
"""删除群信息"""
try:
sql = "DELETE FROM t_chatrooms WHERE chatroom_id = %s"
self.execute_update(sql, (chatroom_id,))
self.LOG.info(f"成功删除群聊 {chatroom_id} 信息")
return True
except Exception as e:
self.LOG.error(f"删除群聊{chatroom_id}信息失败: {e}")
return False
def delete_chatroom_members_info(self, chatroom_id: str) -> bool:
"""删除群成员信息"""
try:
sql = "DELETE FROM t_chatroom_member WHERE chatroom_id = %s"
self.execute_update(sql, (chatroom_id,))
self.LOG.info(f"成功删除群聊 {chatroom_id} 的成员信息")
return True
except Exception as e:
self.LOG.error(f"删除群聊{chatroom_id}的成员信息失败: {e}")
return False
# 新增获取群列表接口
def get_chatroom_list(self) -> List[dict]:
"""获取群列表"""
try:
sql = "SELECT * FROM t_chatrooms"
results = self.execute_query(sql)
for result in results:
if result.get('member_list'):
result['member_list'] = json.loads(result['member_list'])
return results
except Exception as e:
self.LOG.error(f"获取群列表失败: {e}")
return []
# 新增获取群成员列表接口
def get_chatroom_member_list(self, chatroom_id: str) -> List[dict]:
"""获取群成员列表"""
try:
sql = "SELECT * FROM t_chatroom_member WHERE chatroom_id = %s"
results = self.execute_query(sql, (chatroom_id,))
return results
except Exception as e:
self.LOG.error(f"获取群{chatroom_id}成员列表失败: {e}")
return []
# 新增获取群成员列表接口
def get_chatroom_small_member_list(self, chatroom_id: str) -> List[dict]:
"""获取群成员列表"""
try:
sql = "SELECT wxid,nick_name,display_name,status,latest_active_time FROM t_chatroom_member WHERE chatroom_id = %s"
results = self.execute_query(sql, (chatroom_id,))
for row in results:
dt = row.get("latest_active_time")
if isinstance(dt, datetime):
row["latest_active_time"] = dt.strftime("%Y-%m-%d %H:%M:%S")
return results
except Exception as e:
self.LOG.error(f"获取群{chatroom_id}成员列表失败: {e}")
return []
# 获取群成员的昵称信息
def get_chatroom_member_list_name_all(self) -> List[dict]:
"""获取群成员列表"""
try:
sql = "SELECT chatroom_id,wxid, COALESCE(NULLIF(display_name,''), nick_name, wxid) as nick_name FROM t_chatroom_member"
results = self.execute_query(sql)
return results
except Exception as e:
self.LOG.error(f"获取群成员列表失败: {e}")
return []
# 新增获取群成员信息接口
def get_chatroom_member_info(self, chatroom_id: str, wxid: str) -> Optional[dict]:
"""获取群成员信息"""
try:
sql = "SELECT * FROM t_chatroom_member WHERE chatroom_id = %s AND wxid = %s LIMIT 1"
result = self.execute_query(sql, (chatroom_id, wxid), fetch_one=True)
return result
except Exception as e:
self.LOG.error(f"获取群{chatroom_id}成员{wxid}信息失败: {e}")
return None
# 新增群信息删除功能
def delete_chatroom_all_info(self, chatroom_id: str) -> bool:
"""删除群成员信息"""
try:
sql = "DELETE FROM t_chatroom_member WHERE chatroom_id = %s"
self.execute_update(sql, (chatroom_id,))
self.LOG.info(f"成功删除群{chatroom_id}信息")
sql = "DELETE FROM t_chatrooms WHERE chatroom_id = %s"
self.execute_update(sql, (chatroom_id,))
self.LOG.info(f"成功删除群聊 {chatroom_id} 信息")
return True
except Exception as e:
self.LOG.error(f"删除群{chatroom_id}信息失败: {e}")
return False
# 新增删除所有联系人功能
def delete_all_contacts(self) -> bool:
"""删除所有联系人信息"""
try:
sql = "DELETE FROM t_wechat_contacts"
self.execute_update(sql)
self.LOG.info(f"成功删除所有联系人信息")
return True
except Exception as e:
self.LOG.error(f"删除所有联系人信息失败: {e}")
return False
# 新增获取所有联系人头像信息接口
def get_all_contacts_avatar(self) -> Dict[str, str]:
"""获取所有联系人头像信息"""
try:
sql = """
SELECT user_name, small_head_img_url
FROM t_wechat_contacts
union all
SELECT wxid as user_name, small_head_img_url
FROM t_chatroom_member
union all
SELECT chatroom_id as user_name, small_head_img_url
FROM t_chatrooms
"""
results = self.execute_query(sql)
# 返回DICT
results = {result['user_name']: result['small_head_img_url'] for result in results}
return results
except Exception as e:
self.LOG.error(f"获取所有联系人头像信息失败: {e}")
return []
# 新增更新群成员最后活跃时间接口
def update_chatroom_member_active_time(self, chatroom_id: str, wxid: str) -> bool:
"""更新群成员的最后活跃时间"""
try:
# 只有当成员确实存在时才更新
sql = """
UPDATE t_chatroom_member
SET latest_active_time = CURRENT_TIMESTAMP
WHERE chatroom_id = %s AND wxid = %s
"""
self.execute_update(sql, (chatroom_id, wxid))
return True
except Exception as e:
self.LOG.error(f"更新群{chatroom_id}成员{wxid}活跃时间失败: {e}")
return False
def get_inactive_members_rank(self, chatroom_id: str, days: int = 60, limit: int = 10) -> List[Dict]:
try:
sql = """
SELECT
wxid,
COALESCE(NULLIF(display_name,''), nick_name, wxid) AS nick_name,
latest_active_time,
CASE
WHEN latest_active_time IS NULL THEN 999999
ELSE TIMESTAMPDIFF(DAY, latest_active_time, NOW())
END AS inactivity_days
, CASE WHEN latest_active_time IS NULL THEN 1 ELSE 0 END AS never_spoken
FROM t_chatroom_member
WHERE chatroom_id = %s
AND (latest_active_time IS NULL OR latest_active_time <= DATE_SUB(NOW(), INTERVAL %s DAY))
ORDER BY inactivity_days DESC
LIMIT %s
"""
results = self.execute_query(sql, (chatroom_id, days, limit))
for row in results:
dt = row.get("latest_active_time")
if isinstance(dt, datetime):
row["latest_active_time"] = dt.strftime("%Y-%m-%d %H:%M:%S")
return results
except Exception as e:
self.LOG.error(f"获取群{chatroom_id}潜水排行失败: {e}")
return []
def get_group_member_summary(self, chatroom_id: str, inactive_days: int = 30) -> Dict[str, Any]:
"""获取群成员概览摘要"""
try:
sql = """
SELECT
COUNT(*) AS total_members,
SUM(CASE WHEN status = 1 THEN 1 ELSE 0 END) AS in_group_members,
SUM(CASE WHEN is_owner = 1 THEN 1 ELSE 0 END) AS owner_count,
SUM(CASE WHEN is_admin = 1 THEN 1 ELSE 0 END) AS admin_count,
SUM(CASE WHEN latest_active_time IS NOT NULL THEN 1 ELSE 0 END) AS spoken_members,
SUM(CASE WHEN latest_active_time IS NULL THEN 1 ELSE 0 END) AS never_spoken_members,
SUM(
CASE
WHEN latest_active_time IS NOT NULL
AND latest_active_time >= DATE_SUB(NOW(), INTERVAL 7 DAY)
THEN 1 ELSE 0
END
) AS active_7d_members,
SUM(
CASE
WHEN latest_active_time IS NOT NULL
AND latest_active_time >= DATE_SUB(NOW(), INTERVAL 30 DAY)
THEN 1 ELSE 0
END
) AS active_30d_members,
SUM(
CASE
WHEN latest_active_time IS NULL
OR latest_active_time < DATE_SUB(NOW(), INTERVAL %s DAY)
THEN 1 ELSE 0
END
) AS inactive_members
FROM t_chatroom_member
WHERE chatroom_id = %s
"""
result = self.execute_query(sql, (inactive_days, chatroom_id), fetch_one=True) or {}
return {
"total_members": int(result.get("total_members") or 0),
"in_group_members": int(result.get("in_group_members") or 0),
"owner_count": int(result.get("owner_count") or 0),
"admin_count": int(result.get("admin_count") or 0),
"spoken_members": int(result.get("spoken_members") or 0),
"never_spoken_members": int(result.get("never_spoken_members") or 0),
"active_7d_members": int(result.get("active_7d_members") or 0),
"active_30d_members": int(result.get("active_30d_members") or 0),
"inactive_members": int(result.get("inactive_members") or 0),
}
except Exception as e:
self.LOG.error(f"获取群{chatroom_id}成员摘要失败: {e}")
return {
"total_members": 0,
"in_group_members": 0,
"owner_count": 0,
"admin_count": 0,
"spoken_members": 0,
"never_spoken_members": 0,
"active_7d_members": 0,
"active_30d_members": 0,
"inactive_members": 0,
}