from __future__ import annotations import base64 import imghdr import re from datetime import datetime from pathlib import Path from typing import Any, Awaitable, Callable, Dict, List, Optional from wechat_ipad import WechatAPIClient def build_recent_image_context( *, message: Dict[str, Any], room_id: str, content: str, quote_context: Dict[str, str], get_latest_image_message: Callable[..., Optional[Dict[str, Any]]], get_sender_name: Callable[[str, str], str], image_config: Dict[str, Any], ) -> Dict[str, str]: if quote_context: return {} latest_image = get_latest_image_message( room_id, before_timestamp=str(message.get("timestamp") or ""), ) if not latest_image: return {} if not is_recent_image_followup(content, latest_image, image_config): return {} sender = str(latest_image.get("sender", "") or "") sender_name = get_sender_name(room_id, sender) if sender else "未知成员" return { "sender_name": sender_name, "image_path": str(latest_image.get("image_path", "") or ""), "hint": "用户当前这句大概率是在追问这张最近图片", "timestamp": str(latest_image.get("timestamp", "") or ""), } def is_recent_image_followup(content: str, latest_image: Optional[Dict[str, Any]] = None, image_config: Dict[str, Any] | None = None) -> bool: text = str(content or "").strip().lower() if not text: return False image_words = ["图", "图片", "照片", "截图", "表情包", "这张", "那张", "这图", "这p"] ask_words = ["看看", "看下", "帮我看", "帮看看", "这个", "咋样", "什么", "识别", "分析", "评价", "点评"] comment_words = [ "好看", "丑", "离谱", "抽象", "逆天", "蚌埠住", "绷不住", "乐", "笑死", "色", "涩", "帅", "美", "绝了", "一般", "可以", "不行", "怪", "尬", "像", ] pronoun_words = ["这个", "这", "那", "她", "他", "它"] if any(word in text for word in image_words) and any(word in text for word in ask_words + comment_words): return True if latest_image and is_recent_image_close_enough(latest_image, image_config or {}): short_text = len(text) <= 18 has_pronoun = any(word in text for word in pronoun_words) has_comment = any(word in text for word in comment_words + ask_words) if short_text and has_pronoun and has_comment: return True return False def build_image_safety_hints( *, message: Dict[str, Any], content: str, quote_context: Dict[str, str], image_context: Dict[str, str], image_urls: List[str], get_latest_image_message: Callable[..., Optional[Dict[str, Any]]], image_config: Dict[str, Any], ) -> Dict[str, Any]: if quote_context.get("quote_type_label") == "引用图片": return { "suspected": True, "has_visual_context": bool(image_urls), "reason": "用户当前是在引用图片后发言", } if image_context: has_visual_context = bool(image_urls) reason = "用户当前大概率在接最近一张群图片" if not has_visual_context: reason = "识别到图片跟评,但本地图片未成功附带给模型" return { "suspected": True, "has_visual_context": has_visual_context, "reason": reason, } latest_image = get_latest_image_message( str(message.get("roomid") or ""), before_timestamp=str(message.get("timestamp") or ""), ) if latest_image and is_recent_image_followup(content, latest_image, image_config): return { "suspected": True, "has_visual_context": False, "reason": "最近刚出现图片,但这次没有拿到图片内容", } return { "suspected": False, "has_visual_context": bool(image_urls), "reason": "", } def is_recent_image_close_enough(latest_image: Dict[str, Any], image_config: Dict[str, Any]) -> bool: max_gap_minutes = max(int(image_config.get("recent_followup_window_minutes", 5) or 5), 1) image_time = parse_message_time(str(latest_image.get("timestamp") or "")) if not image_time: return False return (datetime.now() - image_time).total_seconds() <= max_gap_minutes * 60 def parse_message_time(value: str) -> Optional[datetime]: if not value: return None for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%d %H:%M", "%Y-%m-%d"): try: return datetime.strptime(value, fmt) except ValueError: continue return None async def prepare_quote_image_inputs( *, bot: WechatAPIClient, quote_context: Dict[str, str], log_event: Callable[..., None], ) -> List[str]: if not quote_context or quote_context.get("quote_type_label") != "引用图片": return [] ref_content = quote_context.get("raw_ref_content", "") or "" image_info = extract_quote_image_info(ref_content) if not image_info: return [] try: base64_str = await bot.download_image( aeskey=image_info["aeskey"], cdnmidimgurl=image_info["url"], ) except Exception as exc: log_event("quote_image_fail", reason=f"download:{exc}") return [] data_url = build_image_data_url(base64_str) if not data_url: log_event("quote_image_fail", reason="invalid_base64") return [] return [data_url] def build_local_image_data_url(image_path: str, main_path: Path) -> str: if not image_path: return "" relative_path = image_path.lstrip("/\\").replace("/", "\\") full_path = main_path / relative_path if not full_path.exists(): return "" try: image_bytes = full_path.read_bytes() except Exception: return "" image_type = imghdr.what(None, h=image_bytes) or "jpeg" raw_base64 = base64.b64encode(image_bytes).decode("utf-8") return f"data:image/{image_type};base64,{raw_base64}" def extract_quote_image_info(ref_content: str) -> Dict[str, str]: if not ref_content: return {} aeskey_match = re.search(r'aeskey="([^"]+)"', ref_content) if not aeskey_match: return {} url_match = re.search(r'cdnmidimgurl="([^"]+)"', ref_content) if not url_match: url_match = re.search(r'cdnbigimgurl="([^"]+)"', ref_content) if not url_match: url_match = re.search(r'cdnthumburl="([^"]+)"', ref_content) if not url_match: return {} return { "aeskey": aeskey_match.group(1), "url": url_match.group(1), } def build_image_data_url(base64_str: str) -> str: raw_base64 = str(base64_str or "").strip() if not raw_base64: return "" if "," in raw_base64 and raw_base64.startswith("data:"): raw_base64 = raw_base64.split(",", 1)[1] try: image_bytes = base64.b64decode(raw_base64) except Exception: return "" image_type = imghdr.what(None, h=image_bytes) or "jpeg" return f"data:image/{image_type};base64,{raw_base64}"