尝试支持图文信息

This commit is contained in:
liuwei
2026-01-27 17:12:55 +08:00
parent 4fb06fabe4
commit 4f16390294

View File

@@ -3,11 +3,13 @@ import re
import time import time
import traceback import traceback
import requests import requests
import io
from typing import Dict, Any, List, Optional, Tuple from typing import Dict, Any, List, Optional, Tuple
from urllib.parse import urlparse from urllib.parse import urlparse
from loguru import logger from loguru import logger
from pathlib import Path from pathlib import Path
from PIL import Image
from base.plugin_common.message_plugin_interface import MessagePluginInterface from base.plugin_common.message_plugin_interface import MessagePluginInterface
from base.plugin_common.plugin_interface import PluginStatus from base.plugin_common.plugin_interface import PluginStatus
@@ -133,24 +135,39 @@ class DouyinParserPlugin(MessagePluginInterface):
original_url = self._clean_url(match.group(0)) original_url = self._clean_url(match.group(0))
self.LOG.info(f"发现抖音链接: {original_url}") self.LOG.info(f"发现抖音链接: {original_url}")
# 解析抖音视频 media_info = self._parse_douyin(original_url)
video_info = self._parse_douyin(original_url) if not media_info:
if not video_info: self.LOG.error(f"❌无法解析抖音媒资信息")
self.LOG.error(f"❌无法解析抖音视频信息")
return False, "解析失败" return False, "解析失败"
video_url = video_info.get('url', '') media_type = media_info.get('type', 'video')
title = video_info.get('title', '无标题') if media_type == 'image':
author = video_info.get('author', '未知作者') imgs = media_info.get('images') or []
cover = video_info.get('cover', '') if not imgs:
return False, "未获取到图片地址"
img_bytes_list: List[bytes] = []
for u in imgs:
b = self._download_image_bytes(u)
if b:
img_bytes_list.append(b)
if not img_bytes_list:
return False, "下载图片失败"
merged = self._merge_images_vertical(img_bytes_list, 1242)
if not merged:
return False, "图片合并失败"
await self.bot.send_image_message((roomid if roomid else sender), merged)
return True, "发送合并图片成功"
else:
video_url = media_info.get('url', '')
title = media_info.get('title', '无标题')
author = media_info.get('author', '未知作者')
cover = media_info.get('cover', '')
if not video_url: if not video_url:
self.LOG.error(f"❌无法获取视频地址") self.LOG.error(f"❌无法获取视频地址")
return False, "获取视频地址失败" return False, "获取视频地址失败"
# 根据模式选择发送方式
if self.download_mode == "file": if self.download_mode == "file":
# 下载并发送文件
video_filename = f"video_{int(time.time())}.mp4" video_filename = f"video_{int(time.time())}.mp4"
save_path = os.path.join(self.download_dir, video_filename) save_path = os.path.join(self.download_dir, video_filename)
self.LOG.info(f"开始下载视频到: {save_path}") self.LOG.info(f"开始下载视频到: {save_path}")
@@ -162,7 +179,6 @@ class DouyinParserPlugin(MessagePluginInterface):
self.LOG.error(f"❌下载视频失败") self.LOG.error(f"❌下载视频失败")
return False, "下载视频失败" return False, "下载视频失败"
else: else:
# 发送卡片
xml_content = f"{VIDEO_XML_MESSAGE}".format(title=author, xml_content = f"{VIDEO_XML_MESSAGE}".format(title=author,
des=title, des=title,
url=video_url, url=video_url,
@@ -191,24 +207,29 @@ class DouyinParserPlugin(MessagePluginInterface):
if not data: if not data:
return data return data
default_cover = "https://is1-ssl.mzstatic.com/image/thumb/Purple221/v4/7c/49/e1/7c49e1af-ce92-d1c4-9a93-0a316e47ba94/AppIcon_TikTok-0-0-1x_U007epad-0-1-0-0-85-220.png/512x512bb.jpg" default_cover = "https://is1-ssl.mzstatic.com/image/thumb/Purple221/v4/7c/49/e1/7c49e1af-ce92-d1c4-9a93-0a316e47ba94/AppIcon_TikTok-0-0-1x_U007epad-0-1-0-0-85-220.png/512x512bb.jpg"
media_type = data.get('type') or 'video'
if media_type == 'video':
cover = data.get('cover') cover = data.get('cover')
if isinstance(cover, str): if isinstance(cover, str):
c = cover.strip().strip('`') c = cover.strip().strip('`')
data['cover'] = c if c.startswith('http') else default_cover data['cover'] = c if c.startswith('http') else default_cover
else: else:
data['cover'] = default_cover data['cover'] = default_cover
else:
imgs = data.get('images') or []
data['cover'] = imgs[0] if imgs else default_cover
return data return data
def _parse_douyin(self, url: str) -> Dict[str, Any]: def _parse_douyin(self, url: str) -> Dict[str, Any]:
try: try:
clean_url = self._clean_url(url) clean_url = self._clean_url(url)
primary = self._parse_from_internal_api(clean_url) primary = self._parse_from_internal_api(clean_url)
if primary and primary.get('url'): if primary and (primary.get('url') or primary.get('images')):
return self._clean_response_data(primary) return self._clean_response_data(primary)
secondary = self._parse_from_external_api(clean_url) secondary = self._parse_from_external_api(clean_url)
if secondary and secondary.get('url'): if secondary and secondary.get('url'):
return self._clean_response_data(secondary) return self._clean_response_data(secondary)
raise DouyinParserError("两种渠道均未获取到视频地址") raise DouyinParserError("未获取到有效媒资数据")
except Exception as e: except Exception as e:
self.LOG.error(f"[抖音] 解析过程发生未知错误: {str(e)}\n{traceback.format_exc()}") self.LOG.error(f"[抖音] 解析过程发生未知错误: {str(e)}\n{traceback.format_exc()}")
raise DouyinParserError(f"未知错误: {str(e)}") raise DouyinParserError(f"未知错误: {str(e)}")
@@ -230,6 +251,22 @@ class DouyinParserPlugin(MessagePluginInterface):
if body.get("code") != 200: if body.get("code") != 200:
return None return None
data = body.get("data") or {} data = body.get("data") or {}
aweme_type = data.get("aweme_type")
author = (data.get("author") or {})
nickname = author.get("nickname") or author.get("unique_id") or "未知作者"
if aweme_type == 68 or (data.get("images") or data.get("image_list")):
images_field = data.get("images") or []
images: List[str] = []
for img in images_field:
ulist = img.get("download_url_list") or img.get("url_list") or []
chosen = self._prefer_image_url(ulist)
if chosen:
images.append(chosen)
desc = data.get("desc") or data.get("caption") or ""
result = {"type": "image", "images": images, "title": desc, "author": nickname, "cover": images[0] if images else ""}
if images:
return result
return None
video = data.get("video") or {} video = data.get("video") or {}
bit_rates = video.get("bit_rate") or [] bit_rates = video.get("bit_rate") or []
chosen_url = "" chosen_url = ""
@@ -252,7 +289,7 @@ class DouyinParserPlugin(MessagePluginInterface):
caption = data.get("caption") or "无标题" caption = data.get("caption") or "无标题"
author = (data.get("author") or {}) author = (data.get("author") or {})
nickname = author.get("nickname") or author.get("unique_id") or "未知作者" nickname = author.get("nickname") or author.get("unique_id") or "未知作者"
result = {"url": chosen_url or "", "title": caption, "author": nickname, "cover": cover_url} result = {"type": "video", "url": chosen_url or "", "title": caption, "author": nickname, "cover": cover_url}
if result.get("url"): if result.get("url"):
return result return result
return None return None
@@ -303,6 +340,21 @@ class DouyinParserPlugin(MessagePluginInterface):
except Exception: except Exception:
return urls[0] if urls else None return urls[0] if urls else None
def _prefer_image_url(self, urls: List[str]) -> Optional[str]:
try:
if not urls:
return None
cleaned = [(u or "").strip().strip("`") for u in urls if u]
jpeg = next((u for u in cleaned if ".jpeg" in u.lower() or u.lower().endswith(".jpg")), None)
if jpeg:
return jpeg
webp = next((u for u in cleaned if ".webp" in u.lower()), None)
if webp:
return webp
return cleaned[0]
except Exception:
return urls[0] if urls else None
def _download_stream(self, url, save_path): def _download_stream(self, url, save_path):
""" """
从指定URL读取视频流并保存到本地 从指定URL读取视频流并保存到本地
@@ -340,3 +392,38 @@ class DouyinParserPlugin(MessagePluginInterface):
except Exception as e: except Exception as e:
self.LOG.error(f"发生未知错误: {e}") self.LOG.error(f"发生未知错误: {e}")
return None return None
def _download_image_bytes(self, url: str) -> Optional[bytes]:
try:
resp = requests.get(url, timeout=15, proxies=self._build_proxies())
if resp.status_code == 200:
return resp.content
return None
except Exception:
return None
def _merge_images_vertical(self, images: List[bytes], target_width: int = 1242) -> Optional[bytes]:
try:
pil_images: List[Image.Image] = []
for b in images:
img = Image.open(io.BytesIO(b))
if img.mode in ("RGBA", "P"):
img = img.convert("RGB")
w, h = img.size
if w != target_width:
ratio = target_width / float(w)
img = img.resize((target_width, int(h * ratio)))
pil_images.append(img)
if not pil_images:
return None
total_height = sum(i.size[1] for i in pil_images)
merged = Image.new("RGB", (target_width, total_height))
y = 0
for im in pil_images:
merged.paste(im, (0, y))
y += im.size[1]
output = io.BytesIO()
merged.save(output, format="JPEG", quality=85)
return output.getvalue()
except Exception:
return None