import requests import html import re from flask import Blueprint, render_template, jsonify, request, current_app, Response, stream_with_context from .auth import login_required from loguru import logger import xml.etree.ElementTree as ET from datetime import datetime from utils.message_formatter import format_quote_message, parse_quote_message # 创建消息管理蓝图 messages_bp = Blueprint('messages', __name__) logger = logger def _is_emoji_message(msg: dict) -> bool: message_type = str(msg.get('message_type', '')) content = msg.get('content') or '' xml_markers = ('', ' bool: if not value: return False value = str(value).strip() if value.startswith(('http://', 'https://')): return True if 'static/images' in value or 'static\\images' in value: return True if '/' in value or '\\' in value: return True return False def _extract_emoji_preview_url(xml_text: str) -> str: if not xml_text: return '' patterns = ( r'cdnurl\s*=\s*["\']([^"\']+)["\']', r'encrypturl\s*=\s*["\']([^"\']+)["\']', r'externurl\s*=\s*["\']([^"\']+)["\']' ) for pattern in patterns: match = re.search(pattern, xml_text) if match: return html.unescape(match.group(1)) return '' def _parse_sysmsg_payload(content: str) -> dict: payload = { "sysmsg_type": "", "summary": "", "replace_msg": "", "session": "", "msgid": "", "newmsgid": "", } text = str(content or "").strip() if not text.startswith(" Response: if not target_url: return Response("missing url", status=400) headers = { "User-Agent": ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/123.0.0.0 Safari/537.36" ), "Referer": "http://weixin.qq.com/" } range_header = request.headers.get("Range") if range_header: headers["Range"] = range_header upstream = requests.get(target_url, headers=headers, stream=True, timeout=30) response_headers = {} for key in ("Content-Type", "Content-Length", "Content-Range", "Accept-Ranges", "Cache-Control", "ETag", "Last-Modified"): value = upstream.headers.get(key) if value: response_headers[key] = value if "Accept-Ranges" not in response_headers: response_headers["Accept-Ranges"] = "bytes" return Response( stream_with_context(upstream.iter_content(chunk_size=64 * 1024)), status=upstream.status_code, headers=response_headers, direct_passthrough=True ) # 消息列表页面 @messages_bp.route('/messages') @login_required def message_list_page(): """消息列表页面""" return render_template('message_list.html') # API路由 @messages_bp.route('/api/messages', methods=['GET']) @login_required def get_messages(): """获取消息列表API""" try: server = current_app.dashboard_server # 获取查询参数 group_id = request.args.get('group_id') start_date = request.args.get('start_date', datetime.now().strftime('%Y-%m-%d')) end_date = request.args.get('end_date', datetime.now().strftime('%Y-%m-%d')) search_text = request.args.get('search_text') page = int(request.args.get('page', 1)) page_size = int(request.args.get('page_size', 20)) # 调用数据库方法获取消息 result = server.message_storage.get_messages_by_filter( group_id=group_id, start_date=start_date, end_date=end_date, search_text=search_text, page=page, page_size=page_size ) # 处理消息数据,添加群组名称和发送者昵称,并格式化引用消息 for msg in result['messages']: raw_content = str(msg.get('content') or '') # 获取群组名称 msg['group_name'] = server.contact_manager.get_nickname(msg['group_id']) or msg['group_id'] # 获取发送者昵称 msg['sender_name'] = server.contact_manager.get_group_name(msg['group_id'], msg['sender']) or msg['sender'] if raw_content.startswith("' in msg['content']: # 使用格式化工具处理引用消息 quote_data = parse_quote_message(msg['content']) msg['content'] = quote_data.get('formatted_message') or format_quote_message(msg['content']) msg['quoted_type'] = quote_data.get('reference_type', '') msg['quoted_preview_image'] = quote_data.get('preview_image', '') msg['quoted_preview_video_thumb'] = quote_data.get('preview_video_thumb', '') msg['quoted_reference_svrid'] = quote_data.get('reference_svrid', '') msg['quoted_reference_md5'] = quote_data.get('reference_md5', '') # 优先使用原始被引用消息自己已落库的图片/缩略图地址 reference_svrid = quote_data.get('reference_svrid') reference_md5 = quote_data.get('reference_md5') referenced_msg = None if reference_svrid: referenced_msg = server.message_storage.get_message_by_message_id(reference_svrid) if not referenced_msg and reference_md5 and msg['quoted_type'] == 'image': referenced_msg = server.message_storage.get_image_message_by_md5(reference_md5) if referenced_msg: if msg['quoted_type'] == 'image': image_path = referenced_msg.get('image_path') or '' msg['quoted_preview_image'] = image_path if _is_usable_local_media_path(image_path) else '' elif msg['quoted_type'] == 'video': video_thumb = referenced_msg.get('message_thumb') or referenced_msg.get('image_path') or '' msg['quoted_preview_video_thumb'] = video_thumb if _is_usable_local_media_path(video_thumb) else '' else: # 其他类型的应用消息,解析 XML 提取标题 root = ET.fromstring(msg['content']) title_elem = root.find('.//title') if title_elem is not None: msg['content'] = title_elem.text except Exception as e: logger.error(f"解析消息类型49出错: {e}") try: msg['content'] = format_quote_message(msg['content']) except Exception: pass return jsonify(result) except Exception as e: logger.error(f"获取消息列表失败: {e}") return jsonify({'error': str(e)}), 500 @messages_bp.route('/api/groups', methods=['GET']) @login_required def get_groups(): """获取群组列表API""" try: server = current_app.dashboard_server # 获取机器人管理的群组列表 groups = [] for group_id in server.contact_manager.get_contacts(): if '@chatroom' in group_id: groups.append({ 'group_id': group_id, 'group_name': server.contact_manager.get_nickname(group_id) or group_id }) return jsonify({'groups': groups}) except Exception as e: logger.error(f"获取群组列表失败: {e}") return jsonify({'error': str(e)}), 500 @messages_bp.route('/api/hourly_message_trend', methods=['GET']) @login_required def get_hourly_message_trend(): """获取按小时聊天趋势数据API""" try: server = current_app.dashboard_server # 获取查询参数 group_id = request.args.get('group_id') days = int(request.args.get('days', 1)) # 调用数据库方法获取按小时消息趋势数据 trend_data = server.message_storage.get_hourly_message_trend(group_id=group_id, days=days) # 格式化数据为前端需要的格式 hours = [] counts = [] for item in trend_data: hours.append(item['hour_slot']) counts.append(item['message_count']) # 获取群组名称 group_name = server.contact_manager.get_nickname(group_id) if group_id else "所有群组" return jsonify({ 'success': True, 'data': { 'hours': hours, 'counts': counts, 'group_name': group_name } }) except Exception as e: logger.error(f"获取按小时聊天趋势数据失败: {e}") return jsonify({'success': False, 'error': str(e)}), 500 @messages_bp.route('/api/messages/media_proxy', methods=['GET']) @login_required def message_media_proxy(): target_url = request.args.get('url', '').strip() if not target_url: return Response("missing url", status=400) if target_url.startswith("<"): return Response("invalid media url", status=400) if not re.match(r"^https?://", target_url, flags=re.IGNORECASE): return Response("unsupported media url", status=400) try: return _proxy_remote_media(target_url) except Exception as e: logger.error(f"消息媒体代理失败: {e}") return Response(f"proxy failed: {e}", status=502)