# -*- coding: utf-8 -*- """ 微信联系人数据库操作类 用于管理微信联系人信息的存储和查询 """ from loguru import logger import json from typing import List, Dict, Optional, Union, Any from db.base import BaseDBOperator from db.connection import DBConnectionManager class ContactsDBOperator(BaseDBOperator): """微信联系人数据库操作类""" def __init__(self, db_manager: DBConnectionManager): super().__init__(db_manager) self.LOG = logger def _ensure_table_exists(self): """确保联系人表存在""" try: # 创建联系人表 self.execute_update(""" CREATE TABLE IF NOT EXISTS t_wechat_contacts ( id INT AUTO_INCREMENT PRIMARY KEY, user_name VARCHAR(64) NOT NULL COMMENT '微信ID', nick_name VARCHAR(128) COMMENT '昵称', py_initial VARCHAR(128) COMMENT '拼音首字母', quan_pin VARCHAR(256) COMMENT '全拼', sex TINYINT COMMENT '性别:1男,2女,0未知', remark VARCHAR(128) COMMENT '备注', remark_py_initial VARCHAR(128) COMMENT '备注拼音首字母', remark_quan_pin VARCHAR(256) COMMENT '备注全拼', signature TEXT COMMENT '个性签名', alias VARCHAR(128) COMMENT '微信号', sns_bg_img TEXT COMMENT '朋友圈背景图', country VARCHAR(64) COMMENT '国家', province VARCHAR(64) COMMENT '省份', city VARCHAR(64) COMMENT '城市', big_head_img_url TEXT COMMENT '大头像URL', small_head_img_url TEXT COMMENT '小头像URL', description TEXT COMMENT '描述', card_img_url TEXT COMMENT '名片图片URL', label_list TEXT COMMENT '标签列表', phone_num_list TEXT COMMENT '电话号码列表', type ENUM('friends', 'chatrooms', 'ghs') NOT NULL COMMENT '联系人类型:好友、群聊、公众号', create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', UNIQUE KEY `idx_user_name` (`user_name`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='微信联系人信息表'; """) # 创建群成员表 - 增加了更多字段以支持详细信息 self.execute_update(""" CREATE TABLE IF NOT EXISTS t_chatroom_member ( id INT AUTO_INCREMENT PRIMARY KEY, chatroom_id VARCHAR(64) NOT NULL COMMENT '群聊ID', wxid VARCHAR(64) NOT NULL COMMENT '成员微信ID', nick_name VARCHAR(128) COMMENT '成员昵称', display_name VARCHAR(128) COMMENT '群内显示名称', inviter_user_name VARCHAR(64) COMMENT '邀请人微信ID', member_flag INT COMMENT '成员标志,2049表示管理员', big_head_img_url TEXT COMMENT '大头像URL', small_head_img_url TEXT COMMENT '小头像URL', is_owner TINYINT(1) DEFAULT 0 COMMENT '是否群主:0否,1是', is_admin TINYINT(1) DEFAULT 0 COMMENT '是否管理员:0否,1是', sex TINYINT COMMENT '性别:1男,2女,0未知', signature TEXT COMMENT '个性签名', alias VARCHAR(128) COMMENT '微信号', country VARCHAR(64) COMMENT '国家', province VARCHAR(64) COMMENT '省份', city VARCHAR(64) COMMENT '城市', label_list TEXT COMMENT '标签列表', phone_num_list TEXT COMMENT '电话号码列表', py_initial VARCHAR(128) COMMENT '拼音首字母', quan_pin VARCHAR(256) COMMENT '全拼', remark_py_initial VARCHAR(128) COMMENT '备注拼音首字母', remark_quan_pin VARCHAR(256) COMMENT '备注全拼', create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', UNIQUE KEY `idx_chatroom_member` (`chatroom_id`, `wxid`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='微信群成员信息表'; """) self.execute_update(""" CREATE TABLE IF NOT EXISTS t_chatrooms ( id INT AUTO_INCREMENT PRIMARY KEY, chatroom_id VARCHAR(64) NOT NULL COMMENT '群聊ID', nick_name VARCHAR(128) COMMENT '群昵称', py_initial VARCHAR(128) COMMENT '群昵称拼音首字母', quan_pin VARCHAR(256) COMMENT '群昵称全拼', sex TINYINT COMMENT '性别', remark VARCHAR(128) COMMENT '备注', remark_py_initial VARCHAR(128) COMMENT '备注拼音首字母', remark_quan_pin VARCHAR(256) COMMENT '备注全拼', chat_room_notify TINYINT COMMENT '群通知', chat_room_owner VARCHAR(64) COMMENT '群主微信ID', small_head_img_url TEXT COMMENT '群头像URL', member_list TEXT COMMENT '成员列表(JSON)', create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', UNIQUE KEY `idx_chatroom_id` (`chatroom_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='微信群信息表'; """) self.LOG.info("成功创建或确认微信群信息表存在") except Exception as e: self.LOG.error(f"创建微信联系人表或群成员表失败: {e}") raise def save_contacts(self, contacts_data: List[Dict], contact_type: str) -> bool: """保存联系人信息到数据库 Args: contacts_data: 联系人数据列表 contact_type: 联系人类型,可选值:'friends', 'chatrooms', 'ghs' Returns: bool: 是否成功保存 """ if not contacts_data: self.LOG.warning(f"没有{contact_type}类型的联系人数据需要保存") return True try: for contact in contacts_data: # 将驼峰命名转换为下划线命名 data = { 'user_name': contact.get('userName', ''), 'nick_name': contact.get('nickName', ''), 'py_initial': contact.get('pyInitial', ''), 'quan_pin': contact.get('quanPin', ''), 'sex': contact.get('sex', 0), 'remark': contact.get('remark', ''), 'remark_py_initial': contact.get('remarkPyInitial', ''), 'remark_quan_pin': contact.get('remarkQuanPin', ''), 'signature': contact.get('signature', ''), 'alias': contact.get('alias', ''), 'sns_bg_img': contact.get('snsBgImg', ''), 'country': contact.get('country', ''), 'province': contact.get('province', ''), 'city': contact.get('city', ''), 'big_head_img_url': contact.get('bigHeadImgUrl', ''), 'small_head_img_url': contact.get('smallHeadImgUrl', ''), 'description': contact.get('description', ''), 'card_img_url': contact.get('cardImgUrl', ''), 'label_list': contact.get('labelList', ''), 'phone_num_list': json.dumps(contact.get('phoneNumList', [])) if contact.get( 'phoneNumList') else '', 'type': contact_type } # 构建SQL语句 fields = ', '.join(data.keys()) placeholders = ', '.join(['%s'] * len(data)) values = tuple(data.values()) # 使用INSERT ... ON DUPLICATE KEY UPDATE语法 update_clause = ', '.join([f"{k}=VALUES({k})" for k in data.keys() if k != 'user_name']) sql = f""" INSERT INTO t_wechat_contacts ({fields}) VALUES ({placeholders}) ON DUPLICATE KEY UPDATE {update_clause} """ self.execute_update(sql, values) self.LOG.info(f"成功保存{len(contacts_data)}个{contact_type}类型的联系人") return True except Exception as e: self.LOG.error(f"保存{contact_type}类型的联系人失败: {e}") return False def save_simple_contacts(self, contact_list: List[str], contact_type: str) -> bool: """保存简单联系人列表(只有user_name)到数据库 Args: contact_list: 联系人ID列表 contact_type: 联系人类型,可选值:'friends', 'chatrooms', 'ghs' Returns: bool: 是否成功保存 """ if not contact_list: self.LOG.warning(f"没有{contact_type}类型的联系人数据需要保存") return True try: for user_name in contact_list: # 构建SQL语句 sql = """ INSERT INTO t_wechat_contacts (user_name, type) VALUES (%s, %s) ON DUPLICATE KEY UPDATE type = VALUES(type), update_time = CURRENT_TIMESTAMP """ self.execute_update(sql, (user_name, contact_type)) self.LOG.info(f"成功保存{len(contact_list)}个{contact_type}类型的简单联系人") return True except Exception as e: self.LOG.error(f"保存{contact_type}类型的简单联系人失败: {e}") return False def get_contacts_by_type(self, contact_type: str) -> List[Dict]: """根据类型获取联系人列表 Args: contact_type: 联系人类型,可选值:'friends', 'chatrooms', 'ghs' Returns: List[Dict]: 联系人列表 """ try: sql = """ SELECT * FROM t_wechat_contacts WHERE type = %s ORDER BY nick_name """ results = self.execute_query(sql, (contact_type,)) return results except Exception as e: self.LOG.error(f"获取{contact_type}类型的联系人失败: {e}") return [] def get_contact_by_user_name(self, user_name: str) -> Optional[Dict]: """根据user_name获取联系人信息 Args: user_name: 联系人ID Returns: Optional[Dict]: 联系人信息,如果不存在则返回None """ try: sql = """ SELECT * FROM t_wechat_contacts WHERE user_name = %s LIMIT 1 """ result = self.execute_query(sql, (user_name,), fetch_one=True) return result except Exception as e: self.LOG.error(f"获取联系人{user_name}失败: {e}") return None def get_display_name(self, user_name: str) -> str: """获取联系人的显示名称(优先使用备注,其次是昵称,最后是微信ID) Args: user_name: 联系人ID Returns: str: 显示名称 """ contact = self.get_contact_by_user_name(user_name) if not contact: return user_name return contact.get('remark') or contact.get('nick_name') or user_name def get_all_contacts(self) -> Dict[str, str]: """获取所有联系人信息 Returns: Dict[str, str]: 联系人字典,格式为 {"wxid": "NickName"} """ try: sql = """ SELECT user_name, nick_name, remark FROM t_wechat_contacts union all SELECT wxid as user_name, nick_name, display_name as remark FROM t_chatroom_member union all SELECT chatroom_id as user_name, nick_name, remark as remark FROM t_chatrooms """ results = self.execute_query(sql) contacts_dict = {} for result in results: user_name = result.get('user_name') # 优先使用备注,其次是昵称,最后是微信ID display_name = result.get('remark') or result.get('nick_name') or user_name contacts_dict[user_name] = display_name self.LOG.info(f"从数据库获取了 {len(contacts_dict)} 个联系人信息") return contacts_dict except Exception as e: self.LOG.error(f"获取所有联系人信息失败: {e}") return {} def get_all_contacts_name_map(self) -> Dict[str, str]: """获取所有联系人的ID到显示名称的映射 Returns: Dict[str, str]: 联系人ID到显示名称的映射 """ try: sql = """ SELECT user_name, remark, nick_name FROM t_wechat_contacts """ results = self.execute_query(sql) name_map = {} for result in results: user_name = result.get('user_name') remark = result.get('remark') nick_name = result.get('nick_name') display_name = remark or nick_name or user_name name_map[user_name] = display_name return name_map except Exception as e: self.LOG.error(f"获取所有联系人名称映射失败: {e}") return {} def save_chatroom_member_simple(self, chatroom_id: str, member_details: List[Dict]) -> bool: """保存简化版的群成员信息到数据库 Args: chatroom_id: 群聊ID member_details: 群成员信息列表,格式为: [{'UserName': str, 'NickName': str, 'InviterUserName': str, 'ChatroomMemberFlag': int, 'DisplayName': str}] Returns: bool: 是否成功保存 """ if not member_details or not chatroom_id: self.LOG.warning(f"没有群聊{chatroom_id}的成员信息需要保存") return False try: for member in member_details: # 处理新的数据结构 wxid = "" if "UserName" in member: if isinstance(member["UserName"], dict): wxid = member["UserName"].get("string", "") else: wxid = member.get("UserName", "") elif "wxid" in member: wxid = member.get("wxid", "") if not wxid: continue # 提取昵称 nick_name = "" if "NickName" in member: if isinstance(member["NickName"], dict): nick_name = member["NickName"].get("string", "") else: nick_name = member.get("NickName", "") elif "nickName" in member: nick_name = member.get("nickName", "") # 提取邀请人 inviter_user_name = "" if "InviterUserName" in member: inviter_user_name = member.get("InviterUserName", "") elif "inviterUserName" in member: inviter_user_name = member.get("inviterUserName", "") # 提取成员标志 member_flag = 0 if "ChatroomMemberFlag" in member: member_flag = member.get("ChatroomMemberFlag", 0) elif "memberFlag" in member: member_flag = member.get("memberFlag", 0) # 判断是否为群主 is_owner = 0 if chatroom_id and wxid: # 查询群信息,检查是否为群主 chat_room_owner_sql = """ SELECT chat_room_owner FROM t_chatrooms WHERE chatroom_id = %s """ chat_room_owner_result = self.execute_query(chat_room_owner_sql, (chatroom_id,), fetch_one=True) if chat_room_owner_result and chat_room_owner_result.get('chat_room_owner') == wxid: is_owner = 1 # 构建数据 data = { 'chatroom_id': chatroom_id, 'wxid': wxid, 'nick_name': nick_name, 'display_name': member.get('DisplayName', ''), 'inviter_user_name': inviter_user_name, 'member_flag': member_flag, 'big_head_img_url': member.get('bigHeadImgUrl', ''), 'small_head_img_url': member.get('smallHeadImgUrl', ''), 'is_owner': is_owner, 'is_admin': 1 if member_flag == 2049 else 0, # 根据memberFlag判断是否为管理员 # 其他字段使用默认值 'sex': 0, 'signature': '', 'alias': '', 'country': '', 'province': '', 'city': '', 'label_list': '', 'phone_num_list': '', 'py_initial': '', 'quan_pin': '', 'remark_py_initial': '', 'remark_quan_pin': '' } # 构建SQL语句 fields = ', '.join(data.keys()) placeholders = ', '.join(['%s'] * len(data)) values = tuple(data.values()) sql = f""" REPLACE INTO t_chatroom_member ({fields}) VALUES ({placeholders}) """ self.execute_update(sql, values) self.LOG.info(f"成功保存群聊{chatroom_id}的{len(member_details)}个成员信息") return True except Exception as e: self.LOG.error(f"保存群聊{chatroom_id}的成员信息失败: {e}") return False def save_chatroom_member_detail(self, chatroom_id: str, member_details: List[Dict]) -> bool: """保存群成员详细信息到数据库 Args: chatroom_id: 群聊ID member_details: 群成员详细信息列表 Returns: bool: 是否成功保存 """ if not member_details or not chatroom_id: self.LOG.warning(f"没有群聊{chatroom_id}的成员详细信息需要保存") return False try: # 获取现有的群成员信息,以便更新而不是替换 existing_members_sql = """ SELECT wxid, is_owner, is_admin FROM t_chatroom_member WHERE chatroom_id = %s """ existing_members_result = self.execute_query(existing_members_sql, (chatroom_id,)) existing_members = {row.get('wxid'): (row.get('is_owner'), row.get('is_admin')) for row in existing_members_result} for member in member_details: wxid = member.get('userName', '') if not wxid: continue # 保留现有的群主和管理员标识 is_owner, is_admin = 0, 0 if wxid in existing_members: is_owner, is_admin = existing_members[wxid] # 处理电话号码列表 phone_num_list = member.get('phoneNumList', []) if phone_num_list: phone_num_str = json.dumps(phone_num_list) else: phone_num_str = '' # 构建数据 data = { 'chatroom_id': chatroom_id, 'wxid': wxid, 'nick_name': member.get('nickName', ''), 'display_name': member.get('remark', ''), # 使用备注作为群内显示名称 'inviter_user_name': member.get('inviterUserName', ''), 'member_flag': member.get('memberFlag', 0), 'big_head_img_url': member.get('bigHeadImgUrl', ''), 'small_head_img_url': member.get('smallHeadImgUrl', ''), 'is_owner': is_owner, 'is_admin': is_admin, # 额外的详细信息字段 'sex': member.get('sex', 0), 'signature': member.get('signature', ''), 'alias': member.get('alias', ''), 'country': member.get('country', ''), 'province': member.get('province', ''), 'city': member.get('city', ''), 'label_list': member.get('labelList', ''), 'phone_num_list': phone_num_str, 'py_initial': member.get('pyInitial', ''), 'quan_pin': member.get('quanPin', ''), 'remark_py_initial': member.get('remarkPyInitial', ''), 'remark_quan_pin': member.get('remarkQuanPin', '') } # 构建SQL语句 - 使用REPLACE INTO确保更新现有记录 fields = ', '.join(data.keys()) placeholders = ', '.join(['%s'] * len(data)) values = tuple(data.values()) sql = f""" REPLACE INTO t_chatroom_member ({fields}) VALUES ({placeholders}) """ self.execute_update(sql, values) self.LOG.info(f"成功保存群聊{chatroom_id}的{len(member_details)}个成员详细信息") return True except Exception as e: self.LOG.error(f"保存群聊{chatroom_id}的成员详细信息失败: {e}") return False def process_chatroom_member_detail_response(self, chatroom_id: str, response: Dict) -> bool: """处理获取群成员详情的API响应 Args: chatroom_id: 群聊ID response: API响应数据 Returns: bool: 是否成功处理 """ try: if response.get('ret') != 200: self.LOG.error(f"获取群聊{chatroom_id}成员详情失败: {response.get('msg')}") return False data = response.get('data', []) if not data: self.LOG.warning(f"群聊{chatroom_id}成员详情数据为空") return False return self.save_chatroom_member_detail(chatroom_id, data) except Exception as e: self.LOG.error(f"处理群聊{chatroom_id}成员详情数据失败: {e}") return False def save_chatroom_info(self, chatroom_data: dict) -> bool: """保存群信息到数据库""" try: # 处理新的数据结构 user_name = "" if isinstance(chatroom_data, dict): if "UserName" in chatroom_data and isinstance(chatroom_data["UserName"], dict): user_name = chatroom_data["UserName"].get("string", "") elif "chatroomId" in chatroom_data: user_name = chatroom_data.get("chatroomId", "") if not user_name: self.LOG.warning("无法获取群聊ID,保存失败") return False # 提取群聊名称 nick_name = "" if "NickName" in chatroom_data and isinstance(chatroom_data["NickName"], dict): nick_name = chatroom_data["NickName"].get("string", "") elif "chatroomName" in chatroom_data: nick_name = chatroom_data.get("chatroomName", "") # 提取群主信息 chat_room_owner = "" if "ChatRoomOwner" in chatroom_data: chat_room_owner = chatroom_data.get("ChatRoomOwner", "") # 提取群通知设置 chat_room_notify = 0 if "ChatRoomNotify" in chatroom_data: chat_room_notify = chatroom_data.get("ChatRoomNotify", 0) # 提取群头像 small_head_img_url = "" if "SmallHeadImgUrl" in chatroom_data: small_head_img_url = chatroom_data.get("SmallHeadImgUrl", "").strip() # 提取群成员列表 member_list = [] if "NewChatroomData" in chatroom_data and "ChatRoomMember" in chatroom_data["NewChatroomData"]: member_list = chatroom_data["NewChatroomData"].get("ChatRoomMember", []) elif "memberList" in chatroom_data: member_list = chatroom_data.get("memberList", []) data = { 'chatroom_id': user_name, 'nick_name': nick_name, 'py_initial': chatroom_data.get('Pyinitial', {}).get('string', ''), 'quan_pin': chatroom_data.get('QuanPin', {}).get('string', ''), 'sex': chatroom_data.get('Sex', 0), 'remark': chatroom_data.get('Remark', {}).get('string', ''), 'remark_py_initial': chatroom_data.get('RemarkPyinitial', {}).get('string', ''), 'remark_quan_pin': chatroom_data.get('RemarkQuanPin', {}).get('string', ''), 'chat_room_notify': chat_room_notify, 'chat_room_owner': chat_room_owner, 'small_head_img_url': small_head_img_url, 'member_list': json.dumps(member_list) } fields = ', '.join(data.keys()) placeholders = ', '.join(['%s'] * len(data)) update_clause = ', '.join([f"{k}=VALUES({k})" for k in data.keys() if k != 'chatroom_id']) values = tuple(data.values()) sql = f""" INSERT INTO t_chatrooms ({fields}) VALUES ({placeholders}) ON DUPLICATE KEY UPDATE {update_clause} """ self.execute_update(sql, values) self.LOG.info(f"成功保存群聊 {data['chatroom_id']} 信息") return True except Exception as e: self.LOG.error(f"保存群聊信息失败: {e}") return False def get_chatroom_info(self, chatroom_id: str) -> Optional[dict]: """获取群信息""" try: sql = "SELECT * FROM t_chatrooms WHERE chatroom_id = %s LIMIT 1" result = self.execute_query(sql, (chatroom_id,), fetch_one=True) if result and result.get('member_list'): result['member_list'] = json.loads(result['member_list']) return result except Exception as e: self.LOG.error(f"获取群聊{chatroom_id}信息失败: {e}") return None def update_chatroom_info(self, chatroom_id: str, update_data: dict) -> bool: """更新群信息""" try: set_clause = ', '.join([f"{k}=%s" for k in update_data.keys()]) values = list(update_data.values()) values.append(chatroom_id) sql = f"UPDATE t_chatrooms SET {set_clause} WHERE chatroom_id = %s" self.execute_update(sql, tuple(values)) self.LOG.info(f"成功更新群聊 {chatroom_id} 信息") return True except Exception as e: self.LOG.error(f"更新群聊{chatroom_id}信息失败: {e}") return False def delete_chatroom_info(self, chatroom_id: str) -> bool: """删除群信息""" try: sql = "DELETE FROM t_chatrooms WHERE chatroom_id = %s" self.execute_update(sql, (chatroom_id,)) self.LOG.info(f"成功删除群聊 {chatroom_id} 信息") return True except Exception as e: self.LOG.error(f"删除群聊{chatroom_id}信息失败: {e}") return False