Files
abot/plugins/sehuatang_push/shehuatang_undetected.py
liuwei a8070a7214 支持复用色花堂常驻浏览器会话
- 为 sehuatang_push 增加远程调试端口附着能力,优先复用常驻 Chrome 浏览器\n- 区分外部浏览器与自管理浏览器,避免任务结束时误关闭用户正在使用的浏览器\n- 从插件配置和任务 payload 读取浏览器复用参数,并补充 browser 配置项说明
2026-04-27 15:41:36 +08:00

400 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import time
import os
import shutil
import subprocess
import requests
from io import BytesIO
import undetected_chromedriver as uc
from selenium import webdriver
# 注意不要禁用析构函数否则会导致Chrome进程泄漏
# if os.name == 'nt':
# try:
# uc.Chrome.__del__ = lambda self: None
# except Exception:
# pass
from selenium.webdriver.common.by import By
from selenium.common.exceptions import SessionNotCreatedException
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from bs4 import BeautifulSoup
from reportlab.lib.pagesizes import A3
from reportlab.lib import colors
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Image, PageBreak
from reportlab.lib.styles import getSampleStyleSheet
from reportlab.pdfbase.ttfonts import TTFont
from reportlab.pdfbase import pdfmetrics
from datetime import datetime
from PIL import Image as PILImage
import re
from PyPDF2 import PdfReader, PdfWriter
from loguru import logger
def _build_chrome_options(debugger_address=None):
"""构建 Chrome 启动参数;当提供调试地址时,表示附着到外部常驻浏览器。"""
# 这里统一使用 Selenium 的 ChromeOptions
# 这样既能给 undetected_chromedriver 复用,也能给 Selenium 直连现有浏览器复用。
options = webdriver.ChromeOptions()
if debugger_address:
# 当需要复用已经启动的浏览器时,只需要告诉 ChromeDriver 去连接哪个调试地址。
# 这种模式下不会新拉起浏览器进程,因此也不应该再塞 headless 等启动参数。
options.add_experimental_option("debuggerAddress", debugger_address)
return options
# 下面这组参数用于“自己启动浏览器”的场景。
# Linux 服务器上继续使用 headless避免任务依赖桌面环境。
if os.name != 'nt':
options.headless = True
options.add_argument('--headless=new')
else:
options.headless = False
options.add_argument('--no-sandbox')
options.add_argument('--disable-gpu')
options.add_argument('--disable-dev-shm-usage')
options.add_argument('--disable-extensions')
options.add_argument('--disable-background-networking')
options.add_argument('--disable-crash-reporter')
options.add_argument('--disable-in-process-stack-traces')
options.add_argument('--disable-logging')
options.add_argument('--disable-dev-shm-usage')
return options
def _normalize_browser_config(browser_config=None):
"""整理浏览器配置,保证后续逻辑总能拿到结构稳定的字典。"""
browser_config = browser_config or {}
return {
"reuse_existing_browser": bool(browser_config.get("reuse_existing_browser", False)),
"debugger_host": str(browser_config.get("debugger_host", "127.0.0.1") or "127.0.0.1").strip(),
"debugger_port": int(browser_config.get("debugger_port", 9222) or 9222),
"allow_launch_fallback": bool(browser_config.get("allow_launch_fallback", True)),
}
def _probe_existing_browser(debugger_host, debugger_port):
"""探测常驻浏览器调试端口是否可用,并返回浏览器端的元信息。"""
version_url = f"http://{debugger_host}:{debugger_port}/json/version"
response = requests.get(version_url, timeout=5)
response.raise_for_status()
return response.json()
def _detect_local_chrome_major_version():
"""检测本机 Chrome/Chromium 主版本号,尽量让 ChromeDriver 跟浏览器版本保持一致。"""
# 这里按不同平台准备一组常见的 Chrome/Chromium 可执行文件位置。
# 这样做的目的,是避免把 driver 版本写死后,浏览器一升级就再次出现版本不兼容。
candidate_paths = []
if os.name == 'nt':
candidate_paths.extend([
os.path.join(os.environ.get("PROGRAMFILES", ""), "Google", "Chrome", "Application", "chrome.exe"),
os.path.join(os.environ.get("PROGRAMFILES(X86)", ""), "Google", "Chrome", "Application", "chrome.exe"),
os.path.join(os.environ.get("LOCALAPPDATA", ""), "Google", "Chrome", "Application", "chrome.exe"),
])
candidate_paths.extend([
shutil.which("chrome"),
shutil.which("chrome.exe"),
])
else:
candidate_paths.extend([
shutil.which("google-chrome"),
shutil.which("google-chrome-stable"),
shutil.which("chromium"),
shutil.which("chromium-browser"),
shutil.which("chrome"),
])
# 依次尝试执行 `--version`,只要拿到类似 `147.0.7727.116` 的版本串,就提取主版本号返回。
for chrome_path in candidate_paths:
if not chrome_path or not os.path.exists(chrome_path):
continue
try:
version_output = subprocess.check_output(
[chrome_path, "--version"],
stderr=subprocess.STDOUT,
text=True,
timeout=5,
).strip()
version_match = re.search(r"(\d+)\.", version_output)
if version_match:
return int(version_match.group(1))
except Exception as exc:
logger.debug(f"检测浏览器版本失败,路径={chrome_path},原因={exc}")
return None
def _extract_browser_major_version_from_error(error_message):
"""从 Selenium/ChromeDriver 报错中提取当前浏览器主版本号,用于兜底重试。"""
# ChromeDriver 版本不匹配时,报错里通常会带 `Current browser version is 147.x.x.x` 这样的信息。
# 这里把这个版本号解析出来,便于在首次启动失败后自动切换到正确版本再试一次。
version_match = re.search(r"Current browser version is (\d+)\.", error_message or "")
if version_match:
return int(version_match.group(1))
return None
def _create_chrome_driver(options):
"""创建 undetected_chromedriver 实例,并在版本不匹配时自动兜底重试一次。"""
detected_major_version = _detect_local_chrome_major_version()
chrome_kwargs = {"options": options}
# 优先使用本机已安装浏览器的主版本号,避免继续使用过期的 driver 版本。
if detected_major_version:
chrome_kwargs["version_main"] = detected_major_version
logger.info(f"检测到本机 Chrome/Chromium 主版本: {detected_major_version}")
else:
logger.warning("未检测到本机 Chrome/Chromium 版本,将交给 undetected_chromedriver 自动处理")
try:
return uc.Chrome(**chrome_kwargs)
except SessionNotCreatedException as exc:
# 如果首次启动失败,并且报错里明确告诉了当前浏览器版本,就按真实版本重试一次。
# 这样即便服务器上实际启动的是另一个 Chrome 可执行文件,也能自动修正 driver 版本。
retry_major_version = _extract_browser_major_version_from_error(str(exc))
if retry_major_version and retry_major_version != detected_major_version:
logger.warning(
f"ChromeDriver 与浏览器版本不匹配,准备按浏览器主版本 {retry_major_version} 自动重试一次"
)
return uc.Chrome(options=options, version_main=retry_major_version)
raise
def _attach_to_existing_browser(browser_config):
"""附着到已经启动的 Chrome 调试会话,避免重复创建和管理浏览器进程。"""
debugger_host = browser_config["debugger_host"]
debugger_port = browser_config["debugger_port"]
debugger_address = f"{debugger_host}:{debugger_port}"
# 先探测调试端口,能提前把“端口没开”与“连接成功”的原因写清楚,排查会更轻松。
browser_meta = _probe_existing_browser(debugger_host, debugger_port)
browser_version = browser_meta.get("Browser", "未知版本")
logger.info(f"准备复用常驻浏览器: {debugger_address},浏览器信息: {browser_version}")
options = _build_chrome_options(debugger_address=debugger_address)
driver = webdriver.Chrome(options=options)
return driver
def _create_browser_session(browser_config=None):
"""根据配置决定是复用常驻浏览器,还是回退到自管理浏览器。"""
normalized_config = _normalize_browser_config(browser_config)
if normalized_config["reuse_existing_browser"]:
try:
driver = _attach_to_existing_browser(normalized_config)
# attached_to_existing_browser=True 表示后续清理时不能去关闭用户自己的常驻浏览器。
return driver, True
except Exception as exc:
if not normalized_config["allow_launch_fallback"]:
logger.error(f"复用常驻浏览器失败,且已禁止启动备用浏览器: {exc}")
raise
logger.warning(f"复用常驻浏览器失败,准备回退到自管理浏览器: {exc}")
options = _build_chrome_options()
driver = _create_chrome_driver(options)
return driver, False
def download_image(url, session):
"""使用同步的 session 下载图片,确保 Cookie 一致"""
try:
if not url.lower().endswith(('.jpg', '.jpeg', '.png')):
return None
response = session.get(url, timeout=15)
response.raise_for_status()
return BytesIO(response.content)
except Exception as e:
logger.warning(f"下载图片失败: {e}")
return None
def add_pdf_encryption(pdf_file, password="4000"):
try:
pdf_writer = PdfWriter()
pdf_reader = PdfReader(pdf_file)
for page in pdf_reader.pages:
pdf_writer.add_page(page)
pdf_writer.encrypt(password)
with open(pdf_file, "wb") as output_pdf:
pdf_writer.write(output_pdf)
logger.debug("PDF加密成功")
except Exception as e:
logger.error(f"PDF加密失败: {e}")
def fetch_and_create_pdf(url, browser_config=None):
driver = None
attached_to_existing_browser = False
try:
# 优先复用外部常驻浏览器,避免插件自己创建和管理浏览器进程;
# 如果未启用复用,或者复用失败但允许回退,则继续使用原来的自管理浏览器方案。
driver, attached_to_existing_browser = _create_browser_session(browser_config)
logger.info(f"正在访问: {url}")
driver.get(url)
# 等待 Cloudflare 5秒盾结束并处理“满18岁”按钮
time.sleep(8)
try:
enter_btn = WebDriverWait(driver, 10).until(
EC.element_to_be_clickable((By.XPATH, '//a[contains(text(), "满18岁请点此进入")]'))
)
enter_btn.click()
logger.debug("点击了年龄确认按钮")
time.sleep(3)
except Exception:
logger.debug("未发现年龄验证按钮,可能已过检测")
# 确保列表加载
WebDriverWait(driver, 20).until(
EC.presence_of_element_located((By.CSS_SELECTOR, 'tbody[id^="normalthread"]'))
)
# 提取数据
soup = BeautifulSoup(driver.page_source, 'html.parser')
posts = [p for p in soup.find_all('tbody', {'id': lambda x: x and x.startswith('normalthread')}) if
p.find('span', {'class': 'xi1'})]
today_posts = posts[::-1]
# 字体注册
pdfmetrics.registerFont(TTFont('SimHei', 'fonts/simhei.ttf'))
styles = getSampleStyleSheet()
title_style = styles['Heading1']
title_style.fontName = 'SimHei'
title_style.textColor = colors.red
normal_style = styles['Normal']
normal_style.fontName = 'SimHei'
# 路径逻辑 - 保存到 temp/JAV 目录
base_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
save_path = os.path.join(base_dir, 'temp', 'JAV')
if not os.path.exists(save_path):
os.makedirs(save_path)
pdf_filename = os.path.join(save_path, f"JAV-{datetime.now().strftime('%Y-%m-%d')}-{len(today_posts)}.pdf")
doc = SimpleDocTemplate(pdf_filename, pagesize=A3)
content = []
max_w, max_h = (A3[0] - 72) * 0.95, (A3[1] - 72) * 0.7
# 同步 Session
session = requests.Session()
ua = driver.execute_script("return navigator.userAgent")
session.headers.update({'User-Agent': ua, 'Referer': 'https://www.sehuatang.net/'})
for c in driver.get_cookies():
session.cookies.set(c['name'], c['value'])
# 循环帖子
for post in today_posts:
title_tag = post.find('a', {'class': 's xst'})
if not title_tag: continue
p_title = title_tag.get_text()
p_url = 'https://www.sehuatang.net/' + title_tag.get('href')
logger.info(f"详情页: {p_title}")
try:
resp = session.get(p_url, timeout=15)
p_soup = BeautifulSoup(resp.text, 'html.parser')
div = p_soup.find('div', {'class': 't_fsz'})
if div:
content.append(Paragraph(f" {p_title}", title_style))
magnets = re.findall(r'magnet:\?[^ \u4e00-\u9fff]+', div.get_text())
for m in magnets:
content.append(Paragraph(f"<b>{m}</b>", normal_style))
for img_tag in div.find_all('img'):
src = img_tag.get('zoomfile')
if src and 'http' in src:
img_io = download_image(src, session)
if img_io:
with PILImage.open(img_io) as p_img:
iw, ih = p_img.size
sc = min(max_w / iw, max_h / ih, 1.0)
img_io.seek(0)
content.append(Image(img_io, width=iw * sc, height=ih * sc))
if post != today_posts[-1]: content.append(PageBreak())
except Exception as e:
logger.error(f"帖子处理失败: {e}")
doc.build(content)
add_pdf_encryption(pdf_filename)
return pdf_filename
except Exception as e:
logger.exception(f"抓取异常: {e}")
return ""
finally:
if driver:
if attached_to_existing_browser:
try:
# 这里是“借用”用户已经在运行的浏览器,只释放当前 WebDriver 会话即可。
# 明确不执行 close(),避免把用户正在用的标签页关掉。
logger.debug("当前使用的是外部常驻浏览器,仅释放 WebDriver 会话,不关闭浏览器本体")
driver.quit()
except Exception as e:
logger.error(f"释放外部浏览器会话时出错: {e}")
else:
try:
logger.debug("正在安全关闭自管理浏览器...")
try:
driver.close()
except Exception as e:
logger.warning(f"关闭浏览器窗口时出错: {e}")
driver.quit()
logger.debug("浏览器已完全关闭")
except Exception as e:
logger.error(f"关闭浏览器时出错: {e}")
# 只有当本次浏览器由当前任务自己拉起时,才需要额外清理潜在残留进程。
if os.name != 'nt' and not attached_to_existing_browser:
try:
import psutil
current_user = os.getlogin()
for proc in psutil.process_iter(['pid', 'name', 'cmdline', 'username']):
try:
if proc.info['name'] and 'chrome' in proc.info['name'].lower():
if proc.info['username'] == current_user:
# 检查是否是本次启动的chrome进程通过命令行参数判断
cmdline = proc.info.get('cmdline', [])
if cmdline and any('--user-data-dir=/tmp/playwright' in str(cmd) for cmd in cmdline):
logger.info(f"强制终止残留Chrome进程: PID={proc.info['pid']}")
proc.kill()
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
pass
except ImportError:
logger.debug("未安装psutil跳过强制清理")
except Exception as e:
logger.warning(f"强制清理Chrome进程时出错: {e}")
def pdf_file_path_undetected(browser_config=None):
try:
url = 'https://www.sehuatang.net/forum.php?mod=forumdisplay&fid=103&filter=typeid&typeid=481'
# 将插件配置透传给抓取函数,便于优先复用外部常驻浏览器会话。
pdf_path = fetch_and_create_pdf(url, browser_config=browser_config)
if pdf_path:
logger.info(f"返回的PDF文件路径{pdf_path}")
return True, pdf_path
else:
# 如果生成失败返回一个默认的PDF路径
default_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "default.pdf")
logger.info(f"PDF生成失败返回默认路径: {default_path}")
return False, default_path
except Exception as e:
logger.error(f"生成PDF路径时出错: {e}")
# 返回一个默认路径
default_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "default.pdf")
return False, default_path
if __name__ == "__main__":
pdf_file_path_undetected()