# -*- coding: utf-8 -*- """ 微信联系人数据库操作类 用于管理微信联系人信息的存储和查询 """ import logging import json from typing import List, Dict, Optional, Union, Any from db.base import BaseDBOperator from db.connection import DBConnectionManager logger = logging.getLogger(__name__) class ContactsDBOperator(BaseDBOperator): """微信联系人数据库操作类""" def __init__(self, db_manager=None): """初始化联系人数据库操作类""" super().__init__(db_manager or DBConnectionManager.get_instance()) self.logger = logging.getLogger("ContactsDBOperator") def _ensure_table_exists(self): """确保联系人表存在""" try: # 创建联系人表 self.execute_update(""" CREATE TABLE IF NOT EXISTS t_wechat_contacts ( id INT AUTO_INCREMENT PRIMARY KEY, user_name VARCHAR(64) NOT NULL COMMENT '微信ID', nick_name VARCHAR(128) COMMENT '昵称', py_initial VARCHAR(128) COMMENT '拼音首字母', quan_pin VARCHAR(256) COMMENT '全拼', sex TINYINT COMMENT '性别:1男,2女,0未知', remark VARCHAR(128) COMMENT '备注', remark_py_initial VARCHAR(128) COMMENT '备注拼音首字母', remark_quan_pin VARCHAR(256) COMMENT '备注全拼', signature TEXT COMMENT '个性签名', alias VARCHAR(128) COMMENT '微信号', sns_bg_img TEXT COMMENT '朋友圈背景图', country VARCHAR(64) COMMENT '国家', province VARCHAR(64) COMMENT '省份', city VARCHAR(64) COMMENT '城市', big_head_img_url TEXT COMMENT '大头像URL', small_head_img_url TEXT COMMENT '小头像URL', description TEXT COMMENT '描述', card_img_url TEXT COMMENT '名片图片URL', label_list TEXT COMMENT '标签列表', phone_num_list TEXT COMMENT '电话号码列表', type ENUM('friends', 'chatrooms', 'ghs') NOT NULL COMMENT '联系人类型:好友、群聊、公众号', create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', UNIQUE KEY `idx_user_name` (`user_name`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='微信联系人信息表'; """) # 创建群成员表 - 增加了更多字段以支持详细信息 self.execute_update(""" CREATE TABLE IF NOT EXISTS t_chatroom_member ( id INT AUTO_INCREMENT PRIMARY KEY, chatroom_id VARCHAR(64) NOT NULL COMMENT '群聊ID', wxid VARCHAR(64) NOT NULL COMMENT '成员微信ID', nick_name VARCHAR(128) COMMENT '成员昵称', display_name VARCHAR(128) COMMENT '群内显示名称', inviter_user_name VARCHAR(64) COMMENT '邀请人微信ID', member_flag INT COMMENT '成员标志,2049表示管理员', big_head_img_url TEXT COMMENT '大头像URL', small_head_img_url TEXT COMMENT '小头像URL', is_owner TINYINT(1) DEFAULT 0 COMMENT '是否群主:0否,1是', is_admin TINYINT(1) DEFAULT 0 COMMENT '是否管理员:0否,1是', sex TINYINT COMMENT '性别:1男,2女,0未知', signature TEXT COMMENT '个性签名', alias VARCHAR(128) COMMENT '微信号', country VARCHAR(64) COMMENT '国家', province VARCHAR(64) COMMENT '省份', city VARCHAR(64) COMMENT '城市', label_list TEXT COMMENT '标签列表', phone_num_list TEXT COMMENT '电话号码列表', py_initial VARCHAR(128) COMMENT '拼音首字母', quan_pin VARCHAR(256) COMMENT '全拼', remark_py_initial VARCHAR(128) COMMENT '备注拼音首字母', remark_quan_pin VARCHAR(256) COMMENT '备注全拼', create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', UNIQUE KEY `idx_chatroom_member` (`chatroom_id`, `wxid`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='微信群成员信息表'; """) self.logger.info("成功创建或确认微信联系人表和群成员表存在") except Exception as e: self.logger.error(f"创建微信联系人表或群成员表失败: {e}") raise def save_contacts(self, contacts_data: List[Dict], contact_type: str) -> bool: """保存联系人信息到数据库 Args: contacts_data: 联系人数据列表 contact_type: 联系人类型,可选值:'friends', 'chatrooms', 'ghs' Returns: bool: 是否成功保存 """ if not contacts_data: self.logger.warning(f"没有{contact_type}类型的联系人数据需要保存") return True try: for contact in contacts_data: # 将驼峰命名转换为下划线命名 data = { 'user_name': contact.get('userName', ''), 'nick_name': contact.get('nickName', ''), 'py_initial': contact.get('pyInitial', ''), 'quan_pin': contact.get('quanPin', ''), 'sex': contact.get('sex', 0), 'remark': contact.get('remark', ''), 'remark_py_initial': contact.get('remarkPyInitial', ''), 'remark_quan_pin': contact.get('remarkQuanPin', ''), 'signature': contact.get('signature', ''), 'alias': contact.get('alias', ''), 'sns_bg_img': contact.get('snsBgImg', ''), 'country': contact.get('country', ''), 'province': contact.get('province', ''), 'city': contact.get('city', ''), 'big_head_img_url': contact.get('bigHeadImgUrl', ''), 'small_head_img_url': contact.get('smallHeadImgUrl', ''), 'description': contact.get('description', ''), 'card_img_url': contact.get('cardImgUrl', ''), 'label_list': contact.get('labelList', ''), 'phone_num_list': json.dumps(contact.get('phoneNumList', [])) if contact.get( 'phoneNumList') else '', 'type': contact_type } # 构建SQL语句 fields = ', '.join(data.keys()) placeholders = ', '.join(['%s'] * len(data)) values = tuple(data.values()) # 使用INSERT ... ON DUPLICATE KEY UPDATE语法 update_clause = ', '.join([f"{k}=VALUES({k})" for k in data.keys() if k != 'user_name']) sql = f""" INSERT INTO t_wechat_contacts ({fields}) VALUES ({placeholders}) ON DUPLICATE KEY UPDATE {update_clause} """ self.execute_update(sql, values) self.logger.info(f"成功保存{len(contacts_data)}个{contact_type}类型的联系人") return True except Exception as e: self.logger.error(f"保存{contact_type}类型的联系人失败: {e}") return False def save_simple_contacts(self, contact_list: List[str], contact_type: str) -> bool: """保存简单联系人列表(只有user_name)到数据库 Args: contact_list: 联系人ID列表 contact_type: 联系人类型,可选值:'friends', 'chatrooms', 'ghs' Returns: bool: 是否成功保存 """ if not contact_list: self.logger.warning(f"没有{contact_type}类型的联系人数据需要保存") return True try: for user_name in contact_list: # 构建SQL语句 sql = """ INSERT INTO t_wechat_contacts (user_name, type) VALUES (%s, %s) ON DUPLICATE KEY UPDATE type = VALUES(type), update_time = CURRENT_TIMESTAMP """ self.execute_update(sql, (user_name, contact_type)) self.logger.info(f"成功保存{len(contact_list)}个{contact_type}类型的简单联系人") return True except Exception as e: self.logger.error(f"保存{contact_type}类型的简单联系人失败: {e}") return False def get_contacts_by_type(self, contact_type: str) -> List[Dict]: """根据类型获取联系人列表 Args: contact_type: 联系人类型,可选值:'friends', 'chatrooms', 'ghs' Returns: List[Dict]: 联系人列表 """ try: sql = """ SELECT * FROM t_wechat_contacts WHERE type = %s ORDER BY nick_name """ results = self.execute_query(sql, (contact_type,)) return results except Exception as e: self.logger.error(f"获取{contact_type}类型的联系人失败: {e}") return [] def get_contact_by_user_name(self, user_name: str) -> Optional[Dict]: """根据user_name获取联系人信息 Args: user_name: 联系人ID Returns: Optional[Dict]: 联系人信息,如果不存在则返回None """ try: sql = """ SELECT * FROM t_wechat_contacts WHERE user_name = %s LIMIT 1 """ result = self.execute_query(sql, (user_name,), fetch_one=True) return result except Exception as e: self.logger.error(f"获取联系人{user_name}失败: {e}") return None def get_display_name(self, user_name: str) -> str: """获取联系人的显示名称(优先使用备注,其次是昵称,最后是微信ID) Args: user_name: 联系人ID Returns: str: 显示名称 """ contact = self.get_contact_by_user_name(user_name) if not contact: return user_name return contact.get('remark') or contact.get('nick_name') or user_name def get_all_contacts_name_map(self) -> Dict[str, str]: """获取所有联系人的ID到显示名称的映射 Returns: Dict[str, str]: 联系人ID到显示名称的映射 """ try: sql = """ SELECT user_name, remark, nick_name FROM t_wechat_contacts """ results = self.execute_query(sql) name_map = {} for result in results: user_name = result.get('user_name') remark = result.get('remark') nick_name = result.get('nick_name') display_name = remark or nick_name or user_name name_map[user_name] = display_name return name_map except Exception as e: self.logger.error(f"获取所有联系人名称映射失败: {e}") return {} def save_chatroom_member_detail(self, chatroom_id: str, member_details: List[Dict]) -> bool: """保存群成员详细信息到数据库 Args: chatroom_id: 群聊ID member_details: 群成员详细信息列表 Returns: bool: 是否成功保存 """ if not member_details or not chatroom_id: self.logger.warning(f"没有群聊{chatroom_id}的成员详细信息需要保存") return False try: # 获取现有的群成员信息,以便更新而不是替换 existing_members_sql = """ SELECT wxid, is_owner, is_admin FROM t_chatroom_member WHERE chatroom_id = %s """ existing_members_result = self.execute_query(existing_members_sql, (chatroom_id,)) existing_members = {row.get('wxid'): (row.get('is_owner'), row.get('is_admin')) for row in existing_members_result} for member in member_details: wxid = member.get('userName', '') if not wxid: continue # 保留现有的群主和管理员标识 is_owner, is_admin = 0, 0 if wxid in existing_members: is_owner, is_admin = existing_members[wxid] # 处理电话号码列表 phone_num_list = member.get('phoneNumList', []) if phone_num_list: phone_num_str = json.dumps(phone_num_list) else: phone_num_str = '' # 构建数据 data = { 'chatroom_id': chatroom_id, 'wxid': wxid, 'nick_name': member.get('nickName', ''), 'display_name': member.get('remark', ''), # 使用备注作为群内显示名称 'inviter_user_name': member.get('inviterUserName', ''), 'member_flag': member.get('memberFlag', 0), 'big_head_img_url': member.get('bigHeadImgUrl', ''), 'small_head_img_url': member.get('smallHeadImgUrl', ''), 'is_owner': is_owner, 'is_admin': is_admin, # 额外的详细信息字段 'sex': member.get('sex', 0), 'signature': member.get('signature', ''), 'alias': member.get('alias', ''), 'country': member.get('country', ''), 'province': member.get('province', ''), 'city': member.get('city', ''), 'label_list': member.get('labelList', ''), 'phone_num_list': phone_num_str, 'py_initial': member.get('pyInitial', ''), 'quan_pin': member.get('quanPin', ''), 'remark_py_initial': member.get('remarkPyInitial', ''), 'remark_quan_pin': member.get('remarkQuanPin', '') } # 构建SQL语句 - 使用REPLACE INTO确保更新现有记录 fields = ', '.join(data.keys()) placeholders = ', '.join(['%s'] * len(data)) values = tuple(data.values()) sql = f""" REPLACE INTO t_chatroom_member ({fields}) VALUES ({placeholders}) """ self.execute_update(sql, values) self.logger.info(f"成功保存群聊{chatroom_id}的{len(member_details)}个成员详细信息") return True except Exception as e: self.logger.error(f"保存群聊{chatroom_id}的成员详细信息失败: {e}") return False def process_chatroom_member_detail_response(self, chatroom_id: str, response: Dict) -> bool: """处理获取群成员详情的API响应 Args: chatroom_id: 群聊ID response: API响应数据 Returns: bool: 是否成功处理 """ try: if response.get('ret') != 200: self.logger.error(f"获取群聊{chatroom_id}成员详情失败: {response.get('msg')}") return False data = response.get('data', []) if not data: self.logger.warning(f"群聊{chatroom_id}成员详情数据为空") return False return self.save_chatroom_member_detail(chatroom_id, data) except Exception as e: self.logger.error(f"处理群聊{chatroom_id}成员详情数据失败: {e}") return False