Files
abot/db/contacts_db.py
2026-01-19 17:39:52 +08:00

715 lines
30 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
微信联系人数据库操作类
用于管理微信联系人信息的存储和查询
"""
from datetime import datetime
from loguru import logger
import json
from typing import List, Dict, Optional, Union, Any
from db.base import BaseDBOperator
from db.connection import DBConnectionManager
class ContactsDBOperator(BaseDBOperator):
"""微信联系人数据库操作类"""
def __init__(self, db_manager: DBConnectionManager):
super().__init__(db_manager)
self.LOG = logger
def _ensure_table_exists(self):
"""确保联系人表存在"""
try:
# 创建联系人表
self.execute_update("""
CREATE TABLE IF NOT EXISTS t_wechat_contacts (
id INT AUTO_INCREMENT PRIMARY KEY,
user_name VARCHAR(64) NOT NULL COMMENT '微信ID',
nick_name VARCHAR(128) COMMENT '昵称',
py_initial VARCHAR(128) COMMENT '拼音首字母',
quan_pin VARCHAR(256) COMMENT '全拼',
sex TINYINT COMMENT '性别1男2女0未知',
remark VARCHAR(128) COMMENT '备注',
remark_py_initial VARCHAR(128) COMMENT '备注拼音首字母',
remark_quan_pin VARCHAR(256) COMMENT '备注全拼',
signature TEXT COMMENT '个性签名',
alias VARCHAR(128) COMMENT '微信号',
sns_bg_img TEXT COMMENT '朋友圈背景图',
country VARCHAR(64) COMMENT '国家',
province VARCHAR(64) COMMENT '省份',
city VARCHAR(64) COMMENT '城市',
big_head_img_url TEXT COMMENT '大头像URL',
small_head_img_url TEXT COMMENT '小头像URL',
description TEXT COMMENT '描述',
card_img_url TEXT COMMENT '名片图片URL',
label_list TEXT COMMENT '标签列表',
phone_num_list TEXT COMMENT '电话号码列表',
type ENUM('friends', 'chatrooms', 'ghs') NOT NULL COMMENT '联系人类型:好友、群聊、公众号',
create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
UNIQUE KEY `idx_user_name` (`user_name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='微信联系人信息表';
""")
# 创建群成员表 - 增加了更多字段以支持详细信息
self.execute_update("""
CREATE TABLE IF NOT EXISTS t_chatroom_member (
id INT AUTO_INCREMENT PRIMARY KEY,
chatroom_id VARCHAR(64) NOT NULL COMMENT '群聊ID',
wxid VARCHAR(64) NOT NULL COMMENT '成员微信ID',
nick_name VARCHAR(128) COMMENT '成员昵称',
display_name VARCHAR(128) COMMENT '群内显示名称',
inviter_user_name VARCHAR(64) COMMENT '邀请人微信ID',
member_flag INT COMMENT '成员标志2049表示管理员',
big_head_img_url TEXT COMMENT '大头像URL',
small_head_img_url TEXT COMMENT '小头像URL',
is_owner TINYINT(1) DEFAULT 0 COMMENT '是否群主0否1是',
is_admin TINYINT(1) DEFAULT 0 COMMENT '是否管理员0否1是',
sex TINYINT COMMENT '性别1男2女0未知',
signature TEXT COMMENT '个性签名',
alias VARCHAR(128) COMMENT '微信号',
country VARCHAR(64) COMMENT '国家',
province VARCHAR(64) COMMENT '省份',
city VARCHAR(64) COMMENT '城市',
label_list TEXT COMMENT '标签列表',
phone_num_list TEXT COMMENT '电话号码列表',
py_initial VARCHAR(128) COMMENT '拼音首字母',
quan_pin VARCHAR(256) COMMENT '全拼',
remark_py_initial VARCHAR(128) COMMENT '备注拼音首字母',
remark_quan_pin VARCHAR(256) COMMENT '备注全拼',
create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
UNIQUE KEY `idx_chatroom_member` (`chatroom_id`, `wxid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='微信群成员信息表';
""")
self.execute_update("""
CREATE TABLE IF NOT EXISTS t_chatrooms (
id INT AUTO_INCREMENT PRIMARY KEY,
chatroom_id VARCHAR(64) NOT NULL COMMENT '群聊ID',
nick_name VARCHAR(128) COMMENT '群昵称',
py_initial VARCHAR(128) COMMENT '群昵称拼音首字母',
quan_pin VARCHAR(256) COMMENT '群昵称全拼',
sex TINYINT COMMENT '性别',
remark VARCHAR(128) COMMENT '备注',
remark_py_initial VARCHAR(128) COMMENT '备注拼音首字母',
remark_quan_pin VARCHAR(256) COMMENT '备注全拼',
chat_room_notify TINYINT COMMENT '群通知',
chat_room_owner VARCHAR(64) COMMENT '群主微信ID',
small_head_img_url TEXT COMMENT '群头像URL',
member_list TEXT COMMENT '成员列表(JSON)',
create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
UNIQUE KEY `idx_chatroom_id` (`chatroom_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='微信群信息表';
""")
self.LOG.info("成功创建或确认微信群信息表存在")
except Exception as e:
self.LOG.error(f"创建微信联系人表或群成员表失败: {e}")
raise
def save_contacts(self, contacts_data: List[Dict], contact_type: str) -> bool:
"""保存联系人信息到数据库
Args:
contacts_data: 联系人数据列表
contact_type: 联系人类型,可选值:'friends', 'chatrooms', 'ghs'
Returns:
bool: 是否成功保存
"""
if not contacts_data:
self.LOG.warning(f"没有{contact_type}类型的联系人数据需要保存")
return True
try:
for contact in contacts_data:
# 兼容微信协议风格的数据结构
def get_str(field, default=""):
val = contact.get(field, default)
if isinstance(val, dict):
return val.get("string", default)
return val if val is not None else default
data = {
'user_name': get_str('UserName'),
'nick_name': get_str('NickName'),
'py_initial': get_str('Pyinitial'),
'quan_pin': get_str('QuanPin'),
'sex': contact.get('Sex', 0),
'remark': get_str('Remark'),
'remark_py_initial': get_str('RemarkPyinitial'),
'remark_quan_pin': get_str('RemarkQuanPin'),
'signature': contact.get('Signature', ''),
'alias': contact.get('Alias', ''),
'sns_bg_img': contact.get('SnsUserInfo', {}).get('SnsBgimgId', ''),
'country': contact.get('Country', ''),
'province': contact.get('Province', ''),
'city': contact.get('City', ''),
'big_head_img_url': contact.get('BigHeadImgUrl', ''),
'small_head_img_url': contact.get('SmallHeadImgUrl', ''),
'description': '', # 可根据需要补充
'card_img_url': '', # 可根据需要补充
'label_list': '', # 可根据需要补充
'phone_num_list': '', # 可根据需要补充
'type': contact_type
}
# 构建SQL语句
fields = ', '.join(data.keys())
placeholders = ', '.join(['%s'] * len(data))
values = tuple(data.values())
# 使用INSERT ... ON DUPLICATE KEY UPDATE语法
update_clause = ', '.join([f"{k}=VALUES({k})" for k in data.keys() if k != 'user_name'])
sql = f"""
INSERT INTO t_wechat_contacts ({fields})
VALUES ({placeholders})
ON DUPLICATE KEY UPDATE {update_clause}
"""
self.execute_update(sql, values)
self.LOG.info(f"成功保存{len(contacts_data)}{contact_type}类型的联系人")
return True
except Exception as e:
self.LOG.error(f"保存{contact_type}类型的联系人失败: {e}")
return False
def save_simple_contacts(self, contact_list: List[str], contact_type: str) -> bool:
"""保存简单联系人列表只有user_name到数据库
Args:
contact_list: 联系人ID列表
contact_type: 联系人类型,可选值:'friends', 'chatrooms', 'ghs'
Returns:
bool: 是否成功保存
"""
if not contact_list:
self.LOG.warning(f"没有{contact_type}类型的联系人数据需要保存")
return True
try:
for user_name in contact_list:
# 构建SQL语句
sql = """
INSERT INTO t_wechat_contacts (user_name, type)
VALUES (%s, %s)
ON DUPLICATE KEY UPDATE type = VALUES(type), update_time = CURRENT_TIMESTAMP
"""
self.execute_update(sql, (user_name, contact_type))
self.LOG.info(f"成功保存{len(contact_list)}{contact_type}类型的简单联系人")
return True
except Exception as e:
self.LOG.error(f"保存{contact_type}类型的简单联系人失败: {e}")
return False
def get_contacts_by_type(self, contact_type: str) -> List[Dict]:
"""根据类型获取联系人列表
Args:
contact_type: 联系人类型,可选值:'friends', 'chatrooms', 'ghs'
Returns:
List[Dict]: 联系人列表
"""
try:
sql = """
SELECT * FROM t_wechat_contacts
WHERE type = %s
ORDER BY nick_name
"""
results = self.execute_query(sql, (contact_type,))
return results
except Exception as e:
self.LOG.error(f"获取{contact_type}类型的联系人失败: {e}")
return []
def get_contact_by_user_name(self, user_name: str) -> Optional[Dict]:
"""根据user_name获取联系人信息
Args:
user_name: 联系人ID
Returns:
Optional[Dict]: 联系人信息如果不存在则返回None
"""
try:
sql = """
SELECT * FROM t_wechat_contacts
WHERE user_name = %s
LIMIT 1
"""
result = self.execute_query(sql, (user_name,), fetch_one=True)
return result
except Exception as e:
self.LOG.error(f"获取联系人{user_name}失败: {e}")
return None
def get_display_name(self, user_name: str) -> str:
"""获取联系人的显示名称优先使用备注其次是昵称最后是微信ID
Args:
user_name: 联系人ID
Returns:
str: 显示名称
"""
contact = self.get_contact_by_user_name(user_name)
if not contact:
return user_name
return contact.get('remark') or contact.get('nick_name') or user_name
def get_all_contacts(self) -> Dict[str, str]:
"""获取所有联系人信息
Returns:
Dict[str, str]: 联系人字典,格式为 {"wxid": "NickName"}
"""
try:
sql = """
SELECT user_name, nick_name, remark
FROM t_wechat_contacts
union all
SELECT wxid as user_name, nick_name, display_name as remark
FROM t_chatroom_member
union all
SELECT chatroom_id as user_name, nick_name, remark as remark
FROM t_chatrooms
"""
results = self.execute_query(sql)
contacts_dict = {}
for result in results:
user_name = result.get('user_name')
remark = result.get('remark')
nick_name = result.get('nick_name')
# 优先使用备注其次是昵称最后是微信ID。如果存在remark,则用remark(nick_name)
if remark:
if nick_name and remark != nick_name:
display_name = f"{remark}({nick_name})"
else:
display_name = remark
else:
display_name = nick_name or user_name
contacts_dict[user_name] = display_name
self.LOG.info(f"从数据库获取了 {len(contacts_dict)} 个联系人信息")
return contacts_dict
except Exception as e:
self.LOG.error(f"获取所有联系人信息失败: {e}")
return {}
def save_chatroom_member_simple(self, chatroom_id: str, member_details: List[Dict]) -> bool:
"""
保存群成员简要信息到数据库,兼容不同数据结构
Args:
chatroom_id: 群聊ID
member_details: 群成员信息列表
Returns:
bool: 是否成功保存
"""
if not member_details:
self.LOG.warning(f"{chatroom_id} 没有成员数据需要保存")
return True
try:
for member in member_details:
# 兼容微信协议风格的数据结构
def get_str(field, default=""):
val = member.get(field, default)
if isinstance(val, dict):
return val.get("string", default)
return val if val is not None else default
data = {
'chatroom_id': chatroom_id,
'wxid': get_str('UserName'),
'nick_name': get_str('NickName'),
'display_name': get_str('DisplayName'),
'inviter_user_name': get_str('InviterUserName'),
'member_flag': member.get('MemberFlag', 0),
'big_head_img_url': get_str('BigHeadImgUrl'),
'small_head_img_url': get_str('SmallHeadImgUrl'),
'is_owner': 1 if member.get('IsOwner', False) else 0,
'is_admin': 1 if member.get('IsAdmin', False) else 0,
'sex': member.get('Sex', 0),
'signature': get_str('Signature'),
'alias': get_str('Alias'),
'country': get_str('Country'),
'province': get_str('Province'),
'city': get_str('City'),
'label_list': '', # 可根据需要补充
'phone_num_list': '', # 可根据需要补充
'py_initial': get_str('Pyinitial'),
'quan_pin': get_str('QuanPin'),
'remark_py_initial': get_str('RemarkPyinitial'),
'remark_quan_pin': get_str('RemarkQuanPin'),
}
fields = ', '.join(data.keys())
placeholders = ', '.join(['%s'] * len(data))
values = tuple(data.values())
update_clause = ', '.join([f"{k}=VALUES({k})" for k in data.keys() if k not in ('chatroom_id', 'wxid')])
sql = f"""
INSERT INTO t_chatroom_member ({fields})
VALUES ({placeholders})
ON DUPLICATE KEY UPDATE {update_clause}
"""
self.execute_update(sql, values)
self.LOG.info(f"成功保存群 {chatroom_id}{len(member_details)} 个成员信息")
return True
except Exception as e:
self.LOG.error(f"保存群 {chatroom_id} 成员信息失败: {e}")
return False
def save_chatroom_member_detail(self, chatroom_id: str, member_details: List[Dict]) -> bool:
"""保存群成员详细信息到数据库
Args:
chatroom_id: 群聊ID
member_details: 群成员详细信息列表
Returns:
bool: 是否成功保存
"""
if not member_details or not chatroom_id:
self.LOG.warning(f"没有群聊{chatroom_id}的成员详细信息需要保存")
return False
try:
# 获取现有的群成员信息,以便更新而不是替换
existing_members_sql = """
SELECT wxid, is_owner, is_admin FROM t_chatroom_member
WHERE chatroom_id = %s
"""
existing_members_result = self.execute_query(existing_members_sql, (chatroom_id,))
existing_members = {row.get('wxid'): (row.get('is_owner'), row.get('is_admin'))
for row in existing_members_result}
for member in member_details:
wxid = member.get('userName', '')
if not wxid:
continue
# 保留现有的群主和管理员标识
is_owner, is_admin = 0, 0
if wxid in existing_members:
is_owner, is_admin = existing_members[wxid]
# 处理电话号码列表
phone_num_list = member.get('phoneNumList', [])
if phone_num_list:
phone_num_str = json.dumps(phone_num_list)
else:
phone_num_str = ''
# 构建数据
data = {
'chatroom_id': chatroom_id,
'wxid': wxid,
'nick_name': member.get('nickName', ''),
'display_name': member.get('remark', ''), # 使用备注作为群内显示名称
'inviter_user_name': member.get('inviterUserName', ''),
'member_flag': member.get('memberFlag', 0),
'big_head_img_url': member.get('bigHeadImgUrl', ''),
'small_head_img_url': member.get('smallHeadImgUrl', ''),
'is_owner': is_owner,
'is_admin': is_admin,
# 额外的详细信息字段
'sex': member.get('sex', 0),
'signature': member.get('signature', ''),
'alias': member.get('alias', ''),
'country': member.get('country', ''),
'province': member.get('province', ''),
'city': member.get('city', ''),
'label_list': member.get('labelList', ''),
'phone_num_list': phone_num_str,
'py_initial': member.get('pyInitial', ''),
'quan_pin': member.get('quanPin', ''),
'remark_py_initial': member.get('remarkPyInitial', ''),
'remark_quan_pin': member.get('remarkQuanPin', '')
}
# 构建SQL语句 - 使用REPLACE INTO确保更新现有记录
fields = ', '.join(data.keys())
placeholders = ', '.join(['%s'] * len(data))
values = tuple(data.values())
sql = f"""
REPLACE INTO t_chatroom_member ({fields})
VALUES ({placeholders})
"""
self.execute_update(sql, values)
self.LOG.info(f"成功保存群聊{chatroom_id}{len(member_details)}个成员详细信息")
return True
except Exception as e:
self.LOG.error(f"保存群聊{chatroom_id}的成员详细信息失败: {e}")
return False
def process_chatroom_member_detail_response(self, chatroom_id: str, response: Dict) -> bool:
"""处理获取群成员详情的API响应
Args:
chatroom_id: 群聊ID
response: API响应数据
Returns:
bool: 是否成功处理
"""
try:
if response.get('ret') != 200:
self.LOG.error(f"获取群聊{chatroom_id}成员详情失败: {response.get('msg')}")
return False
data = response.get('data', [])
if not data:
self.LOG.warning(f"群聊{chatroom_id}成员详情数据为空")
return False
return self.save_chatroom_member_detail(chatroom_id, data)
except Exception as e:
self.LOG.error(f"处理群聊{chatroom_id}成员详情数据失败: {e}")
return False
def save_chatroom_info(self, chatroom_data: dict) -> bool:
"""保存群信息到数据库,兼容微信协议风格字段"""
try:
def get_str(field, default=""):
val = chatroom_data.get(field, default)
if isinstance(val, dict):
return val.get("string", default)
return val if val is not None else default
data = {
'chatroom_id': get_str('UserName'),
'nick_name': get_str('NickName',"未知群名"),
'py_initial': get_str('Pyinitial'),
'quan_pin': get_str('QuanPin'),
'sex': chatroom_data.get('Sex', 0),
'remark': get_str('Remark'),
'remark_py_initial': get_str('RemarkPyinitial'),
'remark_quan_pin': get_str('RemarkQuanPin'),
'chat_room_notify': chatroom_data.get('ChatRoomNotify', 0),
'chat_room_owner': chatroom_data.get('ChatRoomOwner', ''),
'small_head_img_url': chatroom_data.get('SmallHeadImgUrl', ''),
# 成员列表可选存储为JSON字符串
'member_list': json.dumps(chatroom_data.get('NewChatroomData', {}).get('ChatRoomMember', []),
ensure_ascii=False)
}
fields = ', '.join(data.keys())
placeholders = ', '.join(['%s'] * len(data))
values = tuple(data.values())
update_clause = ', '.join([f"{k}=VALUES({k})" for k in data.keys() if k != 'chatroom_id'])
sql = f"""
INSERT INTO t_chatrooms ({fields})
VALUES ({placeholders})
ON DUPLICATE KEY UPDATE {update_clause}
"""
self.execute_update(sql, values)
self.LOG.info(f"成功保存群聊信息: {data['chatroom_id']}")
return True
except Exception as e:
self.LOG.error(f"保存群聊信息失败: {e}")
return False
def get_chatroom_info(self, chatroom_id: str) -> Optional[dict]:
"""获取群信息"""
try:
sql = "SELECT * FROM t_chatrooms WHERE chatroom_id = %s LIMIT 1"
result = self.execute_query(sql, (chatroom_id,), fetch_one=True)
if result and result.get('member_list'):
result['member_list'] = json.loads(result['member_list'])
return result
except Exception as e:
self.LOG.error(f"获取群聊{chatroom_id}信息失败: {e}")
return None
def update_chatroom_info(self, chatroom_id: str, update_data: dict) -> bool:
"""更新群信息"""
try:
set_clause = ', '.join([f"{k}=%s" for k in update_data.keys()])
values = list(update_data.values())
values.append(chatroom_id)
sql = f"UPDATE t_chatrooms SET {set_clause} WHERE chatroom_id = %s"
self.execute_update(sql, tuple(values))
self.LOG.info(f"成功更新群聊 {chatroom_id} 信息")
return True
except Exception as e:
self.LOG.error(f"更新群聊{chatroom_id}信息失败: {e}")
return False
def delete_chatroom_info(self, chatroom_id: str) -> bool:
"""删除群信息"""
try:
sql = "DELETE FROM t_chatrooms WHERE chatroom_id = %s"
self.execute_update(sql, (chatroom_id,))
self.LOG.info(f"成功删除群聊 {chatroom_id} 信息")
return True
except Exception as e:
self.LOG.error(f"删除群聊{chatroom_id}信息失败: {e}")
return False
def delete_chatroom_members_info(self, chatroom_id: str) -> bool:
"""删除群成员信息"""
try:
sql = "DELETE FROM t_chatroom_member WHERE chatroom_id = %s"
self.execute_update(sql, (chatroom_id,))
self.LOG.info(f"成功删除群聊 {chatroom_id} 的成员信息")
return True
except Exception as e:
self.LOG.error(f"删除群聊{chatroom_id}的成员信息失败: {e}")
return False
# 新增获取群列表接口
def get_chatroom_list(self) -> List[dict]:
"""获取群列表"""
try:
sql = "SELECT * FROM t_chatrooms"
results = self.execute_query(sql)
for result in results:
if result.get('member_list'):
result['member_list'] = json.loads(result['member_list'])
return results
except Exception as e:
self.LOG.error(f"获取群列表失败: {e}")
return []
# 新增获取群成员列表接口
def get_chatroom_member_list(self, chatroom_id: str) -> List[dict]:
"""获取群成员列表"""
try:
sql = "SELECT * FROM t_chatroom_member WHERE chatroom_id = %s"
results = self.execute_query(sql, (chatroom_id,))
return results
except Exception as e:
self.LOG.error(f"获取群{chatroom_id}成员列表失败: {e}")
return []
# 新增获取群成员列表接口
def get_chatroom_small_member_list(self, chatroom_id: str) -> List[dict]:
"""获取群成员列表"""
try:
sql = "SELECT wxid,nick_name,display_name,status,latest_active_time FROM t_chatroom_member WHERE chatroom_id = %s"
results = self.execute_query(sql, (chatroom_id,))
for row in results:
dt = row.get("latest_active_time")
if isinstance(dt, datetime):
row["latest_active_time"] = dt.strftime("%Y-%m-%d %H:%M:%S")
return results
except Exception as e:
self.LOG.error(f"获取群{chatroom_id}成员列表失败: {e}")
return []
# 获取群成员的昵称信息
def get_chatroom_member_list_name_all(self) -> List[dict]:
"""获取群成员列表"""
try:
sql = "SELECT chatroom_id,wxid, COALESCE(NULLIF(display_name,''), nick_name, wxid) as nick_name FROM t_chatroom_member"
results = self.execute_query(sql)
return results
except Exception as e:
self.LOG.error(f"获取群成员列表失败: {e}")
return []
# 新增获取群成员信息接口
def get_chatroom_member_info(self, chatroom_id: str, wxid: str) -> Optional[dict]:
"""获取群成员信息"""
try:
sql = "SELECT * FROM t_chatroom_member WHERE chatroom_id = %s AND wxid = %s LIMIT 1"
result = self.execute_query(sql, (chatroom_id, wxid), fetch_one=True)
return result
except Exception as e:
self.LOG.error(f"获取群{chatroom_id}成员{wxid}信息失败: {e}")
return None
# 新增群信息删除功能
def delete_chatroom_all_info(self, chatroom_id: str) -> bool:
"""删除群成员信息"""
try:
sql = "DELETE FROM t_chatroom_member WHERE chatroom_id = %s"
self.execute_update(sql, (chatroom_id,))
self.LOG.info(f"成功删除群{chatroom_id}信息")
sql = "DELETE FROM t_chatrooms WHERE chatroom_id = %s"
self.execute_update(sql, (chatroom_id,))
self.LOG.info(f"成功删除群聊 {chatroom_id} 信息")
return True
except Exception as e:
self.LOG.error(f"删除群{chatroom_id}信息失败: {e}")
return False
# 新增删除所有联系人功能
def delete_all_contacts(self) -> bool:
"""删除所有联系人信息"""
try:
sql = "DELETE FROM t_wechat_contacts"
self.execute_update(sql)
self.LOG.info(f"成功删除所有联系人信息")
return True
except Exception as e:
self.LOG.error(f"删除所有联系人信息失败: {e}")
return False
# 新增获取所有联系人头像信息接口
def get_all_contacts_avatar(self) -> Dict[str, str]:
"""获取所有联系人头像信息"""
try:
sql = """
SELECT user_name, small_head_img_url
FROM t_wechat_contacts
union all
SELECT wxid as user_name, small_head_img_url
FROM t_chatroom_member
union all
SELECT chatroom_id as user_name, small_head_img_url
FROM t_chatrooms
"""
results = self.execute_query(sql)
# 返回DICT
results = {result['user_name']: result['small_head_img_url'] for result in results}
return results
except Exception as e:
self.LOG.error(f"获取所有联系人头像信息失败: {e}")
return []
# 新增更新群成员最后活跃时间接口
def update_chatroom_member_active_time(self, chatroom_id: str, wxid: str) -> bool:
"""更新群成员的最后活跃时间"""
try:
# 只有当成员确实存在时才更新
sql = """
UPDATE t_chatroom_member
SET latest_active_time = CURRENT_TIMESTAMP
WHERE chatroom_id = %s AND wxid = %s
"""
self.execute_update(sql, (chatroom_id, wxid))
return True
except Exception as e:
self.LOG.error(f"更新群{chatroom_id}成员{wxid}活跃时间失败: {e}")
return False