From 4f16390294179f677726ae9bd2c1842917a31b09 Mon Sep 17 00:00:00 2001 From: liuwei Date: Tue, 27 Jan 2026 17:12:55 +0800 Subject: [PATCH] =?UTF-8?q?=E5=B0=9D=E8=AF=95=E6=94=AF=E6=8C=81=E5=9B=BE?= =?UTF-8?q?=E6=96=87=E4=BF=A1=E6=81=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- plugins/douyin_parser/main.py | 171 +++++++++++++++++++++++++--------- 1 file changed, 129 insertions(+), 42 deletions(-) diff --git a/plugins/douyin_parser/main.py b/plugins/douyin_parser/main.py index 8dd2150..7bd54bb 100644 --- a/plugins/douyin_parser/main.py +++ b/plugins/douyin_parser/main.py @@ -3,11 +3,13 @@ import re import time import traceback import requests +import io from typing import Dict, Any, List, Optional, Tuple from urllib.parse import urlparse from loguru import logger from pathlib import Path +from PIL import Image from base.plugin_common.message_plugin_interface import MessagePluginInterface from base.plugin_common.plugin_interface import PluginStatus @@ -133,43 +135,57 @@ class DouyinParserPlugin(MessagePluginInterface): original_url = self._clean_url(match.group(0)) self.LOG.info(f"发现抖音链接: {original_url}") - # 解析抖音视频 - video_info = self._parse_douyin(original_url) - if not video_info: - self.LOG.error(f"❌无法解析抖音视频信息") + media_info = self._parse_douyin(original_url) + if not media_info: + self.LOG.error(f"❌无法解析抖音媒资信息") return False, "解析失败" - video_url = video_info.get('url', '') - title = video_info.get('title', '无标题') - author = video_info.get('author', '未知作者') - cover = video_info.get('cover', '') - - if not video_url: - self.LOG.error(f"❌无法获取视频地址") - return False, "获取视频地址失败" - - # 根据模式选择发送方式 - if self.download_mode == "file": - # 下载并发送文件 - video_filename = f"video_{int(time.time())}.mp4" - save_path = os.path.join(self.download_dir, video_filename) - self.LOG.info(f"开始下载视频到: {save_path}") - mp4_path = self._download_stream(video_url, os.path.join(self.download_dir, save_path)) - if mp4_path: - await self.bot.send_video_message((roomid if roomid else sender), Path(mp4_path)) - return True, "发送视频文件成功" - else: - self.LOG.error(f"❌下载视频失败") - return False, "下载视频失败" + media_type = media_info.get('type', 'video') + if media_type == 'image': + imgs = media_info.get('images') or [] + if not imgs: + return False, "未获取到图片地址" + img_bytes_list: List[bytes] = [] + for u in imgs: + b = self._download_image_bytes(u) + if b: + img_bytes_list.append(b) + if not img_bytes_list: + return False, "下载图片失败" + merged = self._merge_images_vertical(img_bytes_list, 1242) + if not merged: + return False, "图片合并失败" + await self.bot.send_image_message((roomid if roomid else sender), merged) + return True, "发送合并图片成功" else: - # 发送卡片 - xml_content = f"{VIDEO_XML_MESSAGE}".format(title=author, - des=title, - url=video_url, - thumburl=cover - ) - await self.bot.send_link_xml_message(xml_content, (roomid if roomid else sender)) - return True, "发送卡片成功" + video_url = media_info.get('url', '') + title = media_info.get('title', '无标题') + author = media_info.get('author', '未知作者') + cover = media_info.get('cover', '') + + if not video_url: + self.LOG.error(f"❌无法获取视频地址") + return False, "获取视频地址失败" + + if self.download_mode == "file": + video_filename = f"video_{int(time.time())}.mp4" + save_path = os.path.join(self.download_dir, video_filename) + self.LOG.info(f"开始下载视频到: {save_path}") + mp4_path = self._download_stream(video_url, os.path.join(self.download_dir, save_path)) + if mp4_path: + await self.bot.send_video_message((roomid if roomid else sender), Path(mp4_path)) + return True, "发送视频文件成功" + else: + self.LOG.error(f"❌下载视频失败") + return False, "下载视频失败" + else: + xml_content = f"{VIDEO_XML_MESSAGE}".format(title=author, + des=title, + url=video_url, + thumburl=cover + ) + await self.bot.send_link_xml_message(xml_content, (roomid if roomid else sender)) + return True, "发送卡片成功" except DouyinParserError as e: self.LOG.error(f"抖音解析错误: {e}") @@ -191,24 +207,29 @@ class DouyinParserPlugin(MessagePluginInterface): if not data: return data default_cover = "https://is1-ssl.mzstatic.com/image/thumb/Purple221/v4/7c/49/e1/7c49e1af-ce92-d1c4-9a93-0a316e47ba94/AppIcon_TikTok-0-0-1x_U007epad-0-1-0-0-85-220.png/512x512bb.jpg" - cover = data.get('cover') - if isinstance(cover, str): - c = cover.strip().strip('`') - data['cover'] = c if c.startswith('http') else default_cover + media_type = data.get('type') or 'video' + if media_type == 'video': + cover = data.get('cover') + if isinstance(cover, str): + c = cover.strip().strip('`') + data['cover'] = c if c.startswith('http') else default_cover + else: + data['cover'] = default_cover else: - data['cover'] = default_cover + imgs = data.get('images') or [] + data['cover'] = imgs[0] if imgs else default_cover return data def _parse_douyin(self, url: str) -> Dict[str, Any]: try: clean_url = self._clean_url(url) primary = self._parse_from_internal_api(clean_url) - if primary and primary.get('url'): + if primary and (primary.get('url') or primary.get('images')): return self._clean_response_data(primary) secondary = self._parse_from_external_api(clean_url) if secondary and secondary.get('url'): return self._clean_response_data(secondary) - raise DouyinParserError("两种渠道均未获取到视频地址") + raise DouyinParserError("未获取到有效媒资数据") except Exception as e: self.LOG.error(f"[抖音] 解析过程发生未知错误: {str(e)}\n{traceback.format_exc()}") raise DouyinParserError(f"未知错误: {str(e)}") @@ -230,6 +251,22 @@ class DouyinParserPlugin(MessagePluginInterface): if body.get("code") != 200: return None data = body.get("data") or {} + aweme_type = data.get("aweme_type") + author = (data.get("author") or {}) + nickname = author.get("nickname") or author.get("unique_id") or "未知作者" + if aweme_type == 68 or (data.get("images") or data.get("image_list")): + images_field = data.get("images") or [] + images: List[str] = [] + for img in images_field: + ulist = img.get("download_url_list") or img.get("url_list") or [] + chosen = self._prefer_image_url(ulist) + if chosen: + images.append(chosen) + desc = data.get("desc") or data.get("caption") or "" + result = {"type": "image", "images": images, "title": desc, "author": nickname, "cover": images[0] if images else ""} + if images: + return result + return None video = data.get("video") or {} bit_rates = video.get("bit_rate") or [] chosen_url = "" @@ -252,7 +289,7 @@ class DouyinParserPlugin(MessagePluginInterface): caption = data.get("caption") or "无标题" author = (data.get("author") or {}) nickname = author.get("nickname") or author.get("unique_id") or "未知作者" - result = {"url": chosen_url or "", "title": caption, "author": nickname, "cover": cover_url} + result = {"type": "video", "url": chosen_url or "", "title": caption, "author": nickname, "cover": cover_url} if result.get("url"): return result return None @@ -303,6 +340,21 @@ class DouyinParserPlugin(MessagePluginInterface): except Exception: return urls[0] if urls else None + def _prefer_image_url(self, urls: List[str]) -> Optional[str]: + try: + if not urls: + return None + cleaned = [(u or "").strip().strip("`") for u in urls if u] + jpeg = next((u for u in cleaned if ".jpeg" in u.lower() or u.lower().endswith(".jpg")), None) + if jpeg: + return jpeg + webp = next((u for u in cleaned if ".webp" in u.lower()), None) + if webp: + return webp + return cleaned[0] + except Exception: + return urls[0] if urls else None + def _download_stream(self, url, save_path): """ 从指定URL读取视频流并保存到本地 @@ -340,3 +392,38 @@ class DouyinParserPlugin(MessagePluginInterface): except Exception as e: self.LOG.error(f"发生未知错误: {e}") return None + + def _download_image_bytes(self, url: str) -> Optional[bytes]: + try: + resp = requests.get(url, timeout=15, proxies=self._build_proxies()) + if resp.status_code == 200: + return resp.content + return None + except Exception: + return None + + def _merge_images_vertical(self, images: List[bytes], target_width: int = 1242) -> Optional[bytes]: + try: + pil_images: List[Image.Image] = [] + for b in images: + img = Image.open(io.BytesIO(b)) + if img.mode in ("RGBA", "P"): + img = img.convert("RGB") + w, h = img.size + if w != target_width: + ratio = target_width / float(w) + img = img.resize((target_width, int(h * ratio))) + pil_images.append(img) + if not pil_images: + return None + total_height = sum(i.size[1] for i in pil_images) + merged = Image.new("RGB", (target_width, total_height)) + y = 0 + for im in pil_images: + merged.paste(im, (0, y)) + y += im.size[1] + output = io.BytesIO() + merged.save(output, format="JPEG", quality=85) + return output.getvalue() + except Exception: + return None