Files
abot/plugins/ai_auto_response/context/image_context.py
2026-04-09 17:46:30 +08:00

201 lines
7.0 KiB
Python

from __future__ import annotations
import base64
import imghdr
import re
from datetime import datetime
from pathlib import Path
from typing import Any, Awaitable, Callable, Dict, List, Optional
from wechat_ipad import WechatAPIClient
def build_recent_image_context(
*,
message: Dict[str, Any],
room_id: str,
content: str,
quote_context: Dict[str, str],
get_latest_image_message: Callable[..., Optional[Dict[str, Any]]],
get_sender_name: Callable[[str, str], str],
image_config: Dict[str, Any],
) -> Dict[str, str]:
if quote_context:
return {}
latest_image = get_latest_image_message(
room_id,
before_timestamp=str(message.get("timestamp") or ""),
)
if not latest_image:
return {}
if not is_recent_image_followup(content, latest_image, image_config):
return {}
sender = str(latest_image.get("sender", "") or "")
sender_name = get_sender_name(room_id, sender) if sender else "未知成员"
return {
"sender_name": sender_name,
"image_path": str(latest_image.get("image_path", "") or ""),
"hint": "用户当前这句大概率是在追问这张最近图片",
"timestamp": str(latest_image.get("timestamp", "") or ""),
}
def is_recent_image_followup(content: str, latest_image: Optional[Dict[str, Any]] = None, image_config: Dict[str, Any] | None = None) -> bool:
text = str(content or "").strip().lower()
if not text:
return False
image_words = ["", "图片", "照片", "截图", "表情包", "这张", "那张", "这图", "这p"]
ask_words = ["看看", "看下", "帮我看", "帮看看", "这个", "咋样", "什么", "识别", "分析", "评价", "点评"]
comment_words = [
"好看", "", "离谱", "抽象", "逆天", "蚌埠住", "绷不住", "", "笑死",
"", "", "", "", "绝了", "一般", "可以", "不行", "", "", "",
]
pronoun_words = ["这个", "", "", "", "", ""]
if any(word in text for word in image_words) and any(word in text for word in ask_words + comment_words):
return True
if latest_image and is_recent_image_close_enough(latest_image, image_config or {}):
short_text = len(text) <= 18
has_pronoun = any(word in text for word in pronoun_words)
has_comment = any(word in text for word in comment_words + ask_words)
if short_text and has_pronoun and has_comment:
return True
return False
def build_image_safety_hints(
*,
message: Dict[str, Any],
content: str,
quote_context: Dict[str, str],
image_context: Dict[str, str],
image_urls: List[str],
get_latest_image_message: Callable[..., Optional[Dict[str, Any]]],
image_config: Dict[str, Any],
) -> Dict[str, Any]:
if quote_context.get("quote_type_label") == "引用图片":
return {
"suspected": True,
"has_visual_context": bool(image_urls),
"reason": "用户当前是在引用图片后发言",
}
if image_context:
has_visual_context = bool(image_urls)
reason = "用户当前大概率在接最近一张群图片"
if not has_visual_context:
reason = "识别到图片跟评,但本地图片未成功附带给模型"
return {
"suspected": True,
"has_visual_context": has_visual_context,
"reason": reason,
}
latest_image = get_latest_image_message(
str(message.get("roomid") or ""),
before_timestamp=str(message.get("timestamp") or ""),
)
if latest_image and is_recent_image_followup(content, latest_image, image_config):
return {
"suspected": True,
"has_visual_context": False,
"reason": "最近刚出现图片,但这次没有拿到图片内容",
}
return {
"suspected": False,
"has_visual_context": bool(image_urls),
"reason": "",
}
def is_recent_image_close_enough(latest_image: Dict[str, Any], image_config: Dict[str, Any]) -> bool:
max_gap_minutes = max(int(image_config.get("recent_followup_window_minutes", 5) or 5), 1)
image_time = parse_message_time(str(latest_image.get("timestamp") or ""))
if not image_time:
return False
return (datetime.now() - image_time).total_seconds() <= max_gap_minutes * 60
def parse_message_time(value: str) -> Optional[datetime]:
if not value:
return None
for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%d %H:%M", "%Y-%m-%d"):
try:
return datetime.strptime(value, fmt)
except ValueError:
continue
return None
async def prepare_quote_image_inputs(
*,
bot: WechatAPIClient,
quote_context: Dict[str, str],
log_event: Callable[..., None],
) -> List[str]:
if not quote_context or quote_context.get("quote_type_label") != "引用图片":
return []
ref_content = quote_context.get("raw_ref_content", "") or ""
image_info = extract_quote_image_info(ref_content)
if not image_info:
return []
try:
base64_str = await bot.download_image(
aeskey=image_info["aeskey"],
cdnmidimgurl=image_info["url"],
)
except Exception as exc:
log_event("quote_image_fail", reason=f"download:{exc}")
return []
data_url = build_image_data_url(base64_str)
if not data_url:
log_event("quote_image_fail", reason="invalid_base64")
return []
return [data_url]
def build_local_image_data_url(image_path: str, main_path: Path) -> str:
if not image_path:
return ""
relative_path = image_path.lstrip("/\\").replace("/", "\\")
full_path = main_path / relative_path
if not full_path.exists():
return ""
try:
image_bytes = full_path.read_bytes()
except Exception:
return ""
image_type = imghdr.what(None, h=image_bytes) or "jpeg"
raw_base64 = base64.b64encode(image_bytes).decode("utf-8")
return f"data:image/{image_type};base64,{raw_base64}"
def extract_quote_image_info(ref_content: str) -> Dict[str, str]:
if not ref_content:
return {}
aeskey_match = re.search(r'aeskey="([^"]+)"', ref_content)
if not aeskey_match:
return {}
url_match = re.search(r'cdnmidimgurl="([^"]+)"', ref_content)
if not url_match:
url_match = re.search(r'cdnbigimgurl="([^"]+)"', ref_content)
if not url_match:
url_match = re.search(r'cdnthumburl="([^"]+)"', ref_content)
if not url_match:
return {}
return {
"aeskey": aeskey_match.group(1),
"url": url_match.group(1),
}
def build_image_data_url(base64_str: str) -> str:
raw_base64 = str(base64_str or "").strip()
if not raw_base64:
return ""
if "," in raw_base64 and raw_base64.startswith("data:"):
raw_base64 = raw_base64.split(",", 1)[1]
try:
image_bytes = base64.b64decode(raw_base64)
except Exception:
return ""
image_type = imghdr.what(None, h=image_bytes) or "jpeg"
return f"data:image/{image_type};base64,{raw_base64}"