重构:新增定时插件业务逻辑内聚到各自插件目录

- daily_news 插件内置百度新闻与60s图片获取逻辑,移除对 base.func_news 的业务依赖\n- epic_free 插件内置周五判断与免费游戏抓取逻辑,移除对 base.func_epic 的业务依赖\n- daily_ranking 插件内置排行生成与积分奖励逻辑,不再依赖 MessageStorage 业务封装\n- sehuatang_push 改为引用插件目录内的抓取与PDF生成实现,将核心业务代码迁入插件目录\n- 确保新插件可独立承载自身业务逻辑,平台层仅提供调度与基础设施能力
This commit is contained in:
liuwei
2026-04-16 16:16:07 +08:00
parent 547c5533d7
commit bb73d07809
6 changed files with 717 additions and 16 deletions

View File

@@ -1,11 +1,11 @@
# -*- coding: utf-8 -*-
import asyncio
import base64
from datetime import datetime
from typing import Any, Dict, List, Optional, Tuple
import requests
from base.func_news import News
from base.plugin_common.message_plugin_interface import MessagePluginInterface
from base.plugin_common.plugin_interface import PluginStatus
from utils.robot_cmd.robot_command import GroupBotManager
@@ -108,9 +108,9 @@ class DailyNewsPlugin(MessagePluginInterface):
return {"success": False, "summary": "没有可推送目标群", "detail": {"target_count": 0}}
try:
# 新闻抓取为同步逻辑,放入线程池避免阻塞调度主循环
text_news = await asyncio.to_thread(News().get_baidu_news)
image_url = await asyncio.to_thread(News().get_news_60s)
# 新闻抓取逻辑内聚在插件内,避免依赖外部业务模块
text_news = await asyncio.to_thread(self._get_baidu_news)
image_url = await asyncio.to_thread(self._get_news_60s_image)
except Exception as e:
return {"success": False, "summary": f"新闻抓取失败: {e}", "detail": {"error": str(e)}}
@@ -151,3 +151,41 @@ class DailyNewsPlugin(MessagePluginInterface):
resp = requests.get(url, timeout=15)
resp.raise_for_status()
return base64.b64encode(resp.content).decode("utf-8")
@staticmethod
def _get_baidu_news() -> str:
"""获取百度热榜文本(插件内实现)。"""
headers = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) "
"Gecko/20100101 Firefox/110.0"
)
}
url = "https://top.baidu.com/api/board?platform=wise&tab=realtime"
now = datetime.now()
current_date = now.strftime("%Y年%m月%d")
weekdays = ["星期一", "星期二", "星期三", "星期四", "星期五", "星期六", "星期日"]
output = f"当前日期:{current_date} {weekdays[now.weekday()]}\n\n"
resp = requests.get(url, headers=headers, timeout=15)
resp.raise_for_status()
post = resp.json()
cards = post.get("data", {}).get("cards", [])
index = 1
for card in cards:
for block in card.get("content", []):
for article in block.get("content", []):
if isinstance(article, dict) and "word" in article:
title = str(article.get("word", "")).strip().replace(" ", "_")
output += f"{index} :#{title}\n"
index += 1
return output
@staticmethod
def _get_news_60s_image() -> Optional[str]:
"""获取 60s 新闻图片地址(插件内实现)。"""
api_url = "http://192.168.2.32:4399/v2/60s"
resp = requests.get(api_url, timeout=15)
resp.raise_for_status()
data = resp.json()
return (data or {}).get("data", {}).get("image")

View File

@@ -1,10 +1,13 @@
# -*- coding: utf-8 -*-
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional, Tuple
from base.plugin_common.message_plugin_interface import MessagePluginInterface
from base.plugin_common.plugin_interface import PluginStatus
from db.message_storage import MessageStorageDB
from db.points_db import PointSource, PointsDBOperator
from utils.robot_cmd.robot_command import GroupBotManager
from utils.wechat.message_to_db import MessageStorage
from utils.wechat.contact_manager import ContactManager
class DailyRankingPlugin(MessagePluginInterface):
@@ -44,11 +47,16 @@ class DailyRankingPlugin(MessagePluginInterface):
def __init__(self):
super().__init__()
self.feature = self.register_feature()
self.message_storage: Optional[MessageStorage] = None
self.message_db: Optional[MessageStorageDB] = None
self.points_db: Optional[PointsDBOperator] = None
def initialize(self, context: Dict[str, Any]) -> bool:
# 与历史系统逻辑保持一致,直接复用 MessageStorage 的排行生成能力。
self.message_storage = MessageStorage()
db_manager = context.get("db_manager")
if db_manager is None:
return False
# 排行业务逻辑下沉到插件内,仅复用 DB Operator 作为数据访问层。
self.message_db = MessageStorageDB(db_manager)
self.points_db = PointsDBOperator(db_manager)
return True
def start(self) -> bool:
@@ -89,8 +97,8 @@ class DailyRankingPlugin(MessagePluginInterface):
}
if not self.bot:
return {"success": False, "summary": "bot 未注入", "detail": {}}
if not self.message_storage:
return {"success": False, "summary": "message_storage 未初始化", "detail": {}}
if not self.message_db or not self.points_db:
return {"success": False, "summary": "排行依赖未初始化", "detail": {}}
target_groups = [str(g).strip() for g in (context.get("target_groups") or []) if str(g).strip()]
if not target_groups:
@@ -105,7 +113,7 @@ class DailyRankingPlugin(MessagePluginInterface):
failed_groups = {}
for gid in target_groups:
try:
ok, text = await self.message_storage.generate_and_send_ranking(gid, {})
ok, text = await self._generate_and_send_ranking(gid)
if ok and text:
await self.bot.send_text_message(gid, text)
success_groups.append(gid)
@@ -121,3 +129,48 @@ class DailyRankingPlugin(MessagePluginInterface):
"failed_groups": failed_groups,
},
}
async def _generate_and_send_ranking(self, group_id: str) -> Tuple[bool, str]:
"""生成并奖励发言排行(插件内实现)。"""
if not self.message_db or not self.points_db:
return False, "排行依赖未初始化"
yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
rows = self.message_db.get_speech_ranking(yesterday, group_id, limit=20)
if not rows:
return False, f"📊 {yesterday} 没有发言记录"
contact_manager = ContactManager.get_instance()
ranking_lines = [f"🏆 {yesterday} 发言排行榜 🏆"]
for rank, row in enumerate(rows, start=1):
wxid = row.get("wx_id")
speech_count = int(row.get("speech_count") or 0)
display_name = contact_manager.get_group_name(group_id, wxid) or wxid
reward = 0
if rank == 1:
reward = 30
ranking_lines.append(f"🥇🐲 {rank}.{display_name}: {speech_count}次 🔥 +{reward}积分")
elif rank == 2:
reward = 20
ranking_lines.append(f"🥈 {rank}.{display_name}: {speech_count}次 ✨ +{reward}积分")
elif rank == 3:
reward = 10
ranking_lines.append(f"🥉 {rank}.{display_name}: {speech_count}次 👏 +{reward}积分")
elif rank <= 10:
reward = 5
ranking_lines.append(f"🌟 {rank}.{display_name}: {speech_count}次 +{reward}积分")
else:
reward = 3
ranking_lines.append(f"👍 {rank}.{display_name}: {speech_count}次 +{reward}积分")
if reward > 0:
self.points_db.add_points(
wxid,
group_id,
reward,
PointSource.OTHER,
f"{yesterday}发言排行第{rank}名奖励",
)
return True, "\n".join(ranking_lines)

View File

@@ -1,7 +1,10 @@
# -*- coding: utf-8 -*-
from datetime import datetime
from typing import Any, Dict, List, Optional, Tuple
from base.func_epic import get_free, is_friday
import requests
from bs4 import BeautifulSoup
from base.plugin_common.message_plugin_interface import MessagePluginInterface
from base.plugin_common.plugin_interface import PluginStatus
from utils.robot_cmd.robot_command import GroupBotManager
@@ -89,7 +92,7 @@ class EpicFreePlugin(MessagePluginInterface):
payload = context.get("payload") or {}
force = bool(payload.get("force", False))
if not force and not is_friday():
if not force and not self._is_friday():
# 非周五时默认跳过;手动触发可通过 payload.force 强制执行。
return {"success": True, "summary": "今天不是周五,已跳过 Epic 播报", "detail": {"skipped": True}}
@@ -103,7 +106,7 @@ class EpicFreePlugin(MessagePluginInterface):
return {"success": False, "summary": "没有可推送目标群", "detail": {"target_count": 0}}
try:
text = get_free()
text = self._get_free_games()
except Exception as e:
return {"success": False, "summary": f"获取 Epic 免费游戏失败: {e}", "detail": {"error": str(e)}}
@@ -126,3 +129,58 @@ class EpicFreePlugin(MessagePluginInterface):
"force": force,
},
}
@staticmethod
def _is_friday() -> bool:
"""判断是否周五(插件内实现)。"""
return datetime.today().weekday() == 4
@staticmethod
def _get_free_games() -> str:
"""抓取 Epic 免费游戏列表(插件内实现)。"""
url = "https://steamstats.cn/xi"
headers = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/90.0.4430.72 Safari/537.36 Edg/90.0.818.41"
)
}
resp = requests.get(url, headers=headers, timeout=20)
resp.raise_for_status()
resp.encoding = resp.apparent_encoding
soup = BeautifulSoup(resp.text, "html.parser")
text = "今日喜加一 :https://store.epicgames.com/en-US/free-games\n"
tbody = soup.find("tbody")
if not tbody:
return text + "未抓取到免费游戏列表"
rows = tbody.find_all("tr")
idx = 1
for row in rows:
cols = row.find_all("td")
if len(cols) < 7:
continue
name = (cols[1].string or "").strip()
gametype = (cols[2].string or "").replace(" ", "").strip()
start = (cols[3].string or "").replace(" ", "").strip()
end = (cols[4].string or "").replace(" ", "").strip()
permanent = (cols[5].string or "").replace(" ", "").strip()
origin_span = cols[6].find("span")
origin = (origin_span.string or "").replace(" ", "").strip() if origin_span else ""
href_value = ""
for a in cols[6].find_all("a"):
href_value = a.get("href", "") or href_value
text += (
f"序号:{idx}\n"
f"游戏名称:{name}\n"
f"DLC/game{gametype}\n"
f"开始时间:{start}\n"
f"结束时间:{end}\n"
f"是否永久:{permanent}\n"
f"平台:{origin}\n"
f"URL{href_value}\n"
)
idx += 1
return text

View File

@@ -4,8 +4,8 @@ from typing import Any, Dict, List, Optional, Tuple
from base.plugin_common.message_plugin_interface import MessagePluginInterface
from base.plugin_common.plugin_interface import PluginStatus
from utils.sehuatang.shehuatang import pdf_file_path
from utils.sehuatang.shehuatang_undetected import pdf_file_path_undetected
from plugins.sehuatang_push.shehuatang import pdf_file_path
from plugins.sehuatang_push.shehuatang_undetected import pdf_file_path_undetected
class SehuatangPushPlugin(MessagePluginInterface):

View File

@@ -0,0 +1,311 @@
import time
import os
import requests
from io import BytesIO
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from webdriver_manager.chrome import ChromeDriverManager
from bs4 import BeautifulSoup
from reportlab.lib.pagesizes import letter, A3
from reportlab.lib import colors
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Image, PageBreak
from reportlab.lib.styles import getSampleStyleSheet
from reportlab.pdfbase.ttfonts import TTFont
from reportlab.pdfbase import pdfmetrics
from datetime import datetime
from PIL import Image as PILImage
import re
from PyPDF2 import PdfReader, PdfWriter
from loguru import logger
# download_image 函数保持不变
def download_image(url):
"""下载大于100KB的图片并返回临时文件路径仅支持jpg、jpeg和png格式"""
try:
if not url.lower().endswith(('.jpg', '.jpeg', '.png')):
return None
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Referer': 'https://tu.a7nz4.us',
}
response = requests.get(url, headers=headers)
response.raise_for_status()
image = BytesIO(response.content)
return image
except requests.exceptions.RequestException as e:
logger.warning(f"下载图片失败: {e}")
return None
def fetch_and_create_pdf(url):
"""根据给定URL抓取页面并生成PDF"""
driver = None
try:
# 配置Selenium
options = Options()
options.add_argument('--headless') # 使用新的headless模式
options.add_argument('--disable-gpu')
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage') # 添加Linux特定配置
options.add_argument('--disable-logging')
options.add_argument('--log-level=3')
options.add_experimental_option('excludeSwitches', ['enable-automation', 'enable-logging'])
# 根据操作系统选择不同的ChromeDriver路径处理方式
if os.name == 'nt': # Windows
chrome_driver_path = os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
"utils", "chromedriver", "chromedriver.exe"
)
else: # Linux
chrome_driver_path = '/usr/bin/chromedriver' # 使用系统PATH中的chromedriver
try:
if os.name == 'nt' and not os.path.exists(chrome_driver_path):
chrome_driver_path = ChromeDriverManager().install()
service = Service(chrome_driver_path, log_path=os.devnull)
driver = webdriver.Chrome(service=service, options=options)
except Exception as e:
logger.debug(f"初始化ChromeDriver失败: {e}")
chrome_driver_path = ChromeDriverManager().install()
service = Service(chrome_driver_path, log_path=os.devnull)
driver = webdriver.Chrome(service=service, options=options)
# 获取目标页面
driver.get(url)
try:
enter_button = WebDriverWait(driver, 5).until(
EC.element_to_be_clickable((By.XPATH, '//a[contains(text(), "满18岁请点此进入")]')))
enter_button.click()
logger.debug("点击了满18岁按钮")
except Exception as e:
logger.warning(f"未找到满18岁按钮跳过此步骤: {e}")
WebDriverWait(driver, 10).until(
EC.presence_of_all_elements_located((By.CSS_SELECTOR, 'tbody[id^="normalthread"]')))
# 处理年龄验证按钮
try:
enter_button = driver.find_element(By.XPATH, '//a[contains(text(), "满18岁请点此进入")]')
enter_button.click()
logger.debug("点击了满18岁按钮")
time.sleep(5)
except Exception as e:
logger.warning(f"未找到满18岁按钮跳过此步骤: {e}")
# 解析页面
html = driver.page_source
soup = BeautifulSoup(html, 'html.parser', from_encoding='utf-8')
posts = soup.find_all('tbody', {'id': lambda x: x and x.startswith('normalthread')})
# 获取今天的日期
today = datetime.now().strftime('%Y-%m-%d')
# 注册中文字体
pdfmetrics.registerFont(TTFont('SimHei', 'fonts/simhei.ttf'))
styles = getSampleStyleSheet()
# 设置样式
title_style = styles['Heading1']
title_style.fontName = 'SimHei'
title_style.fontSize = 14
title_style.textColor = colors.red
title_style.bold = True
normal_style = styles['Normal']
normal_style.fontName = 'SimHei'
normal_style.fontSize = 14
content = []
# 过滤当天帖子并倒序
today_posts = []
for post in posts:
post_time_span = post.find('span', {'class': 'xi1'})
if post_time_span:
today_posts.append(post)
today_posts = today_posts[::-1] # 倒序处理
# 设置PDF - 保存到 temp/JAV 目录
base_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
pdf_filename = os.path.join(base_dir, 'temp', 'JAV', f"JAV-{today}-{len(today_posts)}.pdf")
# 确保目录存在
pdf_dir = os.path.dirname(pdf_filename)
if not os.path.exists(pdf_dir):
os.makedirs(pdf_dir)
doc = SimpleDocTemplate(pdf_filename, pagesize=A3)
# 计算内容区域的宽度和高度
page_width, page_height = A3
content_width = page_width - doc.rightMargin - doc.leftMargin
content_height = page_height - doc.topMargin - doc.bottomMargin
# 设置最大图片尺寸,留出一些边距
max_image_width = content_width * 0.95
max_image_height = content_height * 0.7 # 留出足够空间给文本和其他元素
# 遍历帖子
session = requests.Session()
session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Referer': 'https://www.sehuatang.net/'
})
for c in driver.get_cookies():
try:
session.cookies.set(c['name'], c['value'], domain=c.get('domain'), path=c.get('path', '/'))
except Exception:
session.cookies.set(c['name'], c['value'])
for post in today_posts:
title = post.find('a', {'class': 's xst'})
if title:
post_title = title.get_text()
post_url = title.get('href')
logger.info(post_title)
# 获取帖子内容
post_page_url = 'https://www.sehuatang.net/' + post_url
try:
resp = session.get(post_page_url, timeout=15)
resp.raise_for_status()
post_html = resp.text
except Exception as e:
logger.warning(f"获取帖子内容失败: {e}")
continue
post_soup = BeautifulSoup(post_html, 'html.parser', from_encoding='utf-8')
content_div = post_soup.find('div', {'class': 't_fsz'})
if content_div:
# 提取文本和磁力链接
post_text = content_div.get_text(strip=True)
magnet_links = re.findall(r'magnet:\?[^ \u4e00-\u9fff]+', post_text)
# 添加标题
content.append(Paragraph(f" {post_title}", title_style))
content.append(Spacer(1, 5))
# 添加磁力链接
if magnet_links:
for magnet_link in magnet_links:
content.append(Paragraph(f"<br /><b>{magnet_link}</b><br />", normal_style))
content.append(Spacer(1, 12))
# 添加图片
image_links = []
images = content_div.find_all('img')
for img in images:
if img.get('zoomfile') and 'http' in img.get('zoomfile'):
image_links.append(img.get('zoomfile'))
if image_links:
for img_link in image_links:
image = download_image(img_link)
if image:
try:
# 使用PIL处理图片尺寸
with PILImage.open(image) as img:
img_width, img_height = img.size
# 计算缩放比例,确保图片适应页面
scale_width = max_image_width / img_width
scale_height = max_image_height / img_height
scale = min(scale_width, scale_height, 1.0) # 不超过原始大小
# 计算新的尺寸
new_width = img_width * scale
new_height = img_height * scale
# 重置文件指针
image.seek(0)
img_stream = BytesIO(image.getvalue())
# 添加图片到内容中,使用计算后的尺寸
content.append(Image(img_stream, width=new_width, height=new_height))
content.append(Spacer(1, 4))
logger.debug(
f"处理图片: 原始尺寸 {img_width}x{img_height}, 新尺寸 {new_width}x{new_height}")
except Exception as e:
logger.error(f"处理图片时出错: {e}")
# 在每个帖子后添加分页符(除了最后一页)
if post != today_posts[-1]:
content.append(PageBreak())
# 生成PDF
try:
doc.build(content)
absolute_pdf_path = os.path.abspath(pdf_filename)
logger.info(f"PDF saved as {absolute_pdf_path}")
# 加密PDF
add_pdf_encryption(absolute_pdf_path)
return absolute_pdf_path
except Exception as e:
logger.error(f"生成PDF时出错: {e}")
# 如果生成失败,返回一个默认路径或空字符串
return ""
except Exception as e:
logger.error(f"抓取帖子时出错: {e}")
# 如果抓取失败,返回一个默认路径或空字符串
return ""
finally:
# 确保在所有情况下都关闭driver
if driver:
try:
driver.quit()
logger.debug("Chrome driver已成功关闭")
except Exception as e:
logger.error(f"关闭Chrome driver时出错: {e}")
# 在极端情况下尝试强制结束进程
try:
import psutil
process = psutil.Process(driver.service.process.pid)
process.terminate()
logger.debug("已强制终止Chrome进程")
except Exception as e2:
logger.error(f"强制终止Chrome进程失败: {e2}")
# add_pdf_encryption 和 pdf_file_path 函数保持不变
def add_pdf_encryption(pdf_file, password="4000"):
"""使用PyPDF2为PDF添加加密保护"""
try:
pdf_writer = PdfWriter()
pdf_reader = PdfReader(pdf_file)
for page_num in range(len(pdf_reader.pages)):
pdf_writer.add_page(pdf_reader.pages[page_num])
pdf_writer.encrypt(password)
with open(pdf_file, "wb") as output_pdf:
pdf_writer.write(output_pdf)
logger.debug(f"PDF加密成功密码为: {password}")
except Exception as e:
logger.error(f"PDF加密失败: {e}")
def pdf_file_path():
try:
url = 'https://www.sehuatang.net/forum.php?mod=forumdisplay&fid=103&filter=typeid&typeid=481'
pdf_path = fetch_and_create_pdf(url)
if pdf_path:
logger.info(f"返回的PDF文件路径{pdf_path}")
return True, pdf_path
else:
# 如果生成失败返回一个默认的PDF路径
default_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "default.pdf")
logger.info(f"PDF生成失败返回默认路径: {default_path}")
return False, default_path
except Exception as e:
logger.error(f"生成PDF路径时出错: {e}")
# 返回一个默认路径
default_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "default.pdf")
return False, default_path
if __name__ == "__main__":
pdf_file_path()

View File

@@ -0,0 +1,241 @@
import time
import os
import requests
from io import BytesIO
import undetected_chromedriver as uc
# 注意不要禁用析构函数否则会导致Chrome进程泄漏
# if os.name == 'nt':
# try:
# uc.Chrome.__del__ = lambda self: None
# except Exception:
# pass
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from bs4 import BeautifulSoup
from reportlab.lib.pagesizes import A3
from reportlab.lib import colors
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Image, PageBreak
from reportlab.lib.styles import getSampleStyleSheet
from reportlab.pdfbase.ttfonts import TTFont
from reportlab.pdfbase import pdfmetrics
from datetime import datetime
from PIL import Image as PILImage
import re
from PyPDF2 import PdfReader, PdfWriter
from loguru import logger
def download_image(url, session):
"""使用同步的 session 下载图片,确保 Cookie 一致"""
try:
if not url.lower().endswith(('.jpg', '.jpeg', '.png')):
return None
response = session.get(url, timeout=15)
response.raise_for_status()
return BytesIO(response.content)
except Exception as e:
logger.warning(f"下载图片失败: {e}")
return None
def add_pdf_encryption(pdf_file, password="4000"):
try:
pdf_writer = PdfWriter()
pdf_reader = PdfReader(pdf_file)
for page in pdf_reader.pages:
pdf_writer.add_page(page)
pdf_writer.encrypt(password)
with open(pdf_file, "wb") as output_pdf:
pdf_writer.write(output_pdf)
logger.debug("PDF加密成功")
except Exception as e:
logger.error(f"PDF加密失败: {e}")
def fetch_and_create_pdf(url):
driver = None
service = None
try:
options = uc.ChromeOptions()
# 规避检测的关键配置
# 在Linux服务器上使用headless模式
if os.name != 'nt':
options.headless = True
options.add_argument('--headless=new') # 使用新版headless模式
else:
options.headless = False
options.add_argument('--no-sandbox')
options.add_argument('--disable-gpu')
options.add_argument('--disable-dev-shm-usage')
options.add_argument('--disable-extensions')
options.add_argument('--disable-background-networking')
# 确保进程能被正确清理
options.add_argument('--disable-crash-reporter')
options.add_argument('--disable-in-process-stack-traces')
options.add_argument('--disable-logging')
options.add_argument('--disable-dev-shm-usage')
# 创建driver实例
# 让 undetected_chromedriver 自动检测浏览器版本并下载匹配的 ChromeDriver
# 强制指定版本为144以匹配服务器当前的 Chrome 版本
driver = uc.Chrome(options=options, version_main=144)
logger.info(f"正在访问: {url}")
driver.get(url)
# 等待 Cloudflare 5秒盾结束并处理“满18岁”按钮
time.sleep(8)
try:
enter_btn = WebDriverWait(driver, 10).until(
EC.element_to_be_clickable((By.XPATH, '//a[contains(text(), "满18岁请点此进入")]'))
)
enter_btn.click()
logger.debug("点击了年龄确认按钮")
time.sleep(3)
except Exception:
logger.debug("未发现年龄验证按钮,可能已过检测")
# 确保列表加载
WebDriverWait(driver, 20).until(
EC.presence_of_element_located((By.CSS_SELECTOR, 'tbody[id^="normalthread"]'))
)
# 提取数据
soup = BeautifulSoup(driver.page_source, 'html.parser')
posts = [p for p in soup.find_all('tbody', {'id': lambda x: x and x.startswith('normalthread')}) if
p.find('span', {'class': 'xi1'})]
today_posts = posts[::-1]
# 字体注册
pdfmetrics.registerFont(TTFont('SimHei', 'fonts/simhei.ttf'))
styles = getSampleStyleSheet()
title_style = styles['Heading1']
title_style.fontName = 'SimHei'
title_style.textColor = colors.red
normal_style = styles['Normal']
normal_style.fontName = 'SimHei'
# 路径逻辑 - 保存到 temp/JAV 目录
base_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
save_path = os.path.join(base_dir, 'temp', 'JAV')
if not os.path.exists(save_path):
os.makedirs(save_path)
pdf_filename = os.path.join(save_path, f"JAV-{datetime.now().strftime('%Y-%m-%d')}-{len(today_posts)}.pdf")
doc = SimpleDocTemplate(pdf_filename, pagesize=A3)
content = []
max_w, max_h = (A3[0] - 72) * 0.95, (A3[1] - 72) * 0.7
# 同步 Session
session = requests.Session()
ua = driver.execute_script("return navigator.userAgent")
session.headers.update({'User-Agent': ua, 'Referer': 'https://www.sehuatang.net/'})
for c in driver.get_cookies():
session.cookies.set(c['name'], c['value'])
# 循环帖子
for post in today_posts:
title_tag = post.find('a', {'class': 's xst'})
if not title_tag: continue
p_title = title_tag.get_text()
p_url = 'https://www.sehuatang.net/' + title_tag.get('href')
logger.info(f"详情页: {p_title}")
try:
resp = session.get(p_url, timeout=15)
p_soup = BeautifulSoup(resp.text, 'html.parser')
div = p_soup.find('div', {'class': 't_fsz'})
if div:
content.append(Paragraph(f" {p_title}", title_style))
magnets = re.findall(r'magnet:\?[^ \u4e00-\u9fff]+', div.get_text())
for m in magnets:
content.append(Paragraph(f"<b>{m}</b>", normal_style))
for img_tag in div.find_all('img'):
src = img_tag.get('zoomfile')
if src and 'http' in src:
img_io = download_image(src, session)
if img_io:
with PILImage.open(img_io) as p_img:
iw, ih = p_img.size
sc = min(max_w / iw, max_h / ih, 1.0)
img_io.seek(0)
content.append(Image(img_io, width=iw * sc, height=ih * sc))
if post != today_posts[-1]: content.append(PageBreak())
except Exception as e:
logger.error(f"帖子处理失败: {e}")
doc.build(content)
add_pdf_encryption(pdf_filename)
return pdf_filename
except Exception as e:
logger.exception(f"抓取异常: {e}")
return ""
finally:
# --- 确保Chrome进程被完全关闭 ---
if driver:
try:
logger.debug("正在安全关闭浏览器...")
# 先关闭所有标签页和窗口
try:
driver.close()
except Exception as e:
logger.warning(f"关闭浏览器窗口时出错: {e}")
# 强制退出所有Chrome进程
driver.quit()
logger.debug("浏览器已完全关闭")
except Exception as e:
logger.error(f"关闭浏览器时出错: {e}")
# 额外保险强制清理残留的Chrome进程仅Linux
if os.name != 'nt':
try:
import psutil
current_user = os.getlogin()
for proc in psutil.process_iter(['pid', 'name', 'cmdline', 'username']):
try:
if proc.info['name'] and 'chrome' in proc.info['name'].lower():
if proc.info['username'] == current_user:
# 检查是否是本次启动的chrome进程通过命令行参数判断
cmdline = proc.info.get('cmdline', [])
if cmdline and any('--user-data-dir=/tmp/playwright' in str(cmd) for cmd in cmdline):
logger.info(f"强制终止残留Chrome进程: PID={proc.info['pid']}")
proc.kill()
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
pass
except ImportError:
logger.debug("未安装psutil跳过强制清理")
except Exception as e:
logger.warning(f"强制清理Chrome进程时出错: {e}")
def pdf_file_path_undetected():
try:
url = 'https://www.sehuatang.net/forum.php?mod=forumdisplay&fid=103&filter=typeid&typeid=481'
pdf_path = fetch_and_create_pdf(url)
if pdf_path:
logger.info(f"返回的PDF文件路径{pdf_path}")
return True, pdf_path
else:
# 如果生成失败返回一个默认的PDF路径
default_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "default.pdf")
logger.info(f"PDF生成失败返回默认路径: {default_path}")
return False, default_path
except Exception as e:
logger.error(f"生成PDF路径时出错: {e}")
# 返回一个默认路径
default_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "default.pdf")
return False, default_path
if __name__ == "__main__":
pdf_file_path_undetected()