201 lines
7.0 KiB
Python
201 lines
7.0 KiB
Python
from __future__ import annotations
|
|
|
|
import base64
|
|
import imghdr
|
|
import re
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Any, Awaitable, Callable, Dict, List, Optional
|
|
|
|
from wechat_ipad import WechatAPIClient
|
|
|
|
|
|
def build_recent_image_context(
|
|
*,
|
|
message: Dict[str, Any],
|
|
room_id: str,
|
|
content: str,
|
|
quote_context: Dict[str, str],
|
|
get_latest_image_message: Callable[..., Optional[Dict[str, Any]]],
|
|
get_sender_name: Callable[[str, str], str],
|
|
image_config: Dict[str, Any],
|
|
) -> Dict[str, str]:
|
|
if quote_context:
|
|
return {}
|
|
latest_image = get_latest_image_message(
|
|
room_id,
|
|
before_timestamp=str(message.get("timestamp") or ""),
|
|
)
|
|
if not latest_image:
|
|
return {}
|
|
if not is_recent_image_followup(content, latest_image, image_config):
|
|
return {}
|
|
sender = str(latest_image.get("sender", "") or "")
|
|
sender_name = get_sender_name(room_id, sender) if sender else "未知成员"
|
|
return {
|
|
"sender_name": sender_name,
|
|
"image_path": str(latest_image.get("image_path", "") or ""),
|
|
"hint": "用户当前这句大概率是在追问这张最近图片",
|
|
"timestamp": str(latest_image.get("timestamp", "") or ""),
|
|
}
|
|
|
|
|
|
def is_recent_image_followup(content: str, latest_image: Optional[Dict[str, Any]] = None, image_config: Dict[str, Any] | None = None) -> bool:
|
|
text = str(content or "").strip().lower()
|
|
if not text:
|
|
return False
|
|
image_words = ["图", "图片", "照片", "截图", "表情包", "这张", "那张", "这图", "这p"]
|
|
ask_words = ["看看", "看下", "帮我看", "帮看看", "这个", "咋样", "什么", "识别", "分析", "评价", "点评"]
|
|
comment_words = [
|
|
"好看", "丑", "离谱", "抽象", "逆天", "蚌埠住", "绷不住", "乐", "笑死",
|
|
"色", "涩", "帅", "美", "绝了", "一般", "可以", "不行", "怪", "尬", "像",
|
|
]
|
|
pronoun_words = ["这个", "这", "那", "她", "他", "它"]
|
|
if any(word in text for word in image_words) and any(word in text for word in ask_words + comment_words):
|
|
return True
|
|
if latest_image and is_recent_image_close_enough(latest_image, image_config or {}):
|
|
short_text = len(text) <= 18
|
|
has_pronoun = any(word in text for word in pronoun_words)
|
|
has_comment = any(word in text for word in comment_words + ask_words)
|
|
if short_text and has_pronoun and has_comment:
|
|
return True
|
|
return False
|
|
|
|
|
|
def build_image_safety_hints(
|
|
*,
|
|
message: Dict[str, Any],
|
|
content: str,
|
|
quote_context: Dict[str, str],
|
|
image_context: Dict[str, str],
|
|
image_urls: List[str],
|
|
get_latest_image_message: Callable[..., Optional[Dict[str, Any]]],
|
|
image_config: Dict[str, Any],
|
|
) -> Dict[str, Any]:
|
|
if quote_context.get("quote_type_label") == "引用图片":
|
|
return {
|
|
"suspected": True,
|
|
"has_visual_context": bool(image_urls),
|
|
"reason": "用户当前是在引用图片后发言",
|
|
}
|
|
if image_context:
|
|
has_visual_context = bool(image_urls)
|
|
reason = "用户当前大概率在接最近一张群图片"
|
|
if not has_visual_context:
|
|
reason = "识别到图片跟评,但本地图片未成功附带给模型"
|
|
return {
|
|
"suspected": True,
|
|
"has_visual_context": has_visual_context,
|
|
"reason": reason,
|
|
}
|
|
latest_image = get_latest_image_message(
|
|
str(message.get("roomid") or ""),
|
|
before_timestamp=str(message.get("timestamp") or ""),
|
|
)
|
|
if latest_image and is_recent_image_followup(content, latest_image, image_config):
|
|
return {
|
|
"suspected": True,
|
|
"has_visual_context": False,
|
|
"reason": "最近刚出现图片,但这次没有拿到图片内容",
|
|
}
|
|
return {
|
|
"suspected": False,
|
|
"has_visual_context": bool(image_urls),
|
|
"reason": "",
|
|
}
|
|
|
|
|
|
def is_recent_image_close_enough(latest_image: Dict[str, Any], image_config: Dict[str, Any]) -> bool:
|
|
max_gap_minutes = max(int(image_config.get("recent_followup_window_minutes", 5) or 5), 1)
|
|
image_time = parse_message_time(str(latest_image.get("timestamp") or ""))
|
|
if not image_time:
|
|
return False
|
|
return (datetime.now() - image_time).total_seconds() <= max_gap_minutes * 60
|
|
|
|
|
|
def parse_message_time(value: str) -> Optional[datetime]:
|
|
if not value:
|
|
return None
|
|
for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%d %H:%M", "%Y-%m-%d"):
|
|
try:
|
|
return datetime.strptime(value, fmt)
|
|
except ValueError:
|
|
continue
|
|
return None
|
|
|
|
|
|
async def prepare_quote_image_inputs(
|
|
*,
|
|
bot: WechatAPIClient,
|
|
quote_context: Dict[str, str],
|
|
log_event: Callable[..., None],
|
|
) -> List[str]:
|
|
if not quote_context or quote_context.get("quote_type_label") != "引用图片":
|
|
return []
|
|
ref_content = quote_context.get("raw_ref_content", "") or ""
|
|
image_info = extract_quote_image_info(ref_content)
|
|
if not image_info:
|
|
return []
|
|
try:
|
|
base64_str = await bot.download_image(
|
|
aeskey=image_info["aeskey"],
|
|
cdnmidimgurl=image_info["url"],
|
|
)
|
|
except Exception as exc:
|
|
log_event("quote_image_fail", reason=f"download:{exc}")
|
|
return []
|
|
data_url = build_image_data_url(base64_str)
|
|
if not data_url:
|
|
log_event("quote_image_fail", reason="invalid_base64")
|
|
return []
|
|
return [data_url]
|
|
|
|
|
|
def build_local_image_data_url(image_path: str, main_path: Path) -> str:
|
|
if not image_path:
|
|
return ""
|
|
relative_path = image_path.lstrip("/\\").replace("/", "\\")
|
|
full_path = main_path / relative_path
|
|
if not full_path.exists():
|
|
return ""
|
|
try:
|
|
image_bytes = full_path.read_bytes()
|
|
except Exception:
|
|
return ""
|
|
image_type = imghdr.what(None, h=image_bytes) or "jpeg"
|
|
raw_base64 = base64.b64encode(image_bytes).decode("utf-8")
|
|
return f"data:image/{image_type};base64,{raw_base64}"
|
|
|
|
|
|
def extract_quote_image_info(ref_content: str) -> Dict[str, str]:
|
|
if not ref_content:
|
|
return {}
|
|
aeskey_match = re.search(r'aeskey="([^"]+)"', ref_content)
|
|
if not aeskey_match:
|
|
return {}
|
|
url_match = re.search(r'cdnmidimgurl="([^"]+)"', ref_content)
|
|
if not url_match:
|
|
url_match = re.search(r'cdnbigimgurl="([^"]+)"', ref_content)
|
|
if not url_match:
|
|
url_match = re.search(r'cdnthumburl="([^"]+)"', ref_content)
|
|
if not url_match:
|
|
return {}
|
|
return {
|
|
"aeskey": aeskey_match.group(1),
|
|
"url": url_match.group(1),
|
|
}
|
|
|
|
|
|
def build_image_data_url(base64_str: str) -> str:
|
|
raw_base64 = str(base64_str or "").strip()
|
|
if not raw_base64:
|
|
return ""
|
|
if "," in raw_base64 and raw_base64.startswith("data:"):
|
|
raw_base64 = raw_base64.split(",", 1)[1]
|
|
try:
|
|
image_bytes = base64.b64decode(raw_base64)
|
|
except Exception:
|
|
return ""
|
|
image_type = imghdr.what(None, h=image_bytes) or "jpeg"
|
|
return f"data:image/{image_type};base64,{raw_base64}"
|