diff --git a/plugins/daily_news/main.py b/plugins/daily_news/main.py index e5f2a39..c1bb772 100644 --- a/plugins/daily_news/main.py +++ b/plugins/daily_news/main.py @@ -1,11 +1,11 @@ # -*- coding: utf-8 -*- import asyncio import base64 +from datetime import datetime from typing import Any, Dict, List, Optional, Tuple import requests -from base.func_news import News from base.plugin_common.message_plugin_interface import MessagePluginInterface from base.plugin_common.plugin_interface import PluginStatus from utils.robot_cmd.robot_command import GroupBotManager @@ -108,9 +108,9 @@ class DailyNewsPlugin(MessagePluginInterface): return {"success": False, "summary": "没有可推送目标群", "detail": {"target_count": 0}} try: - # 新闻抓取为同步逻辑,放入线程池避免阻塞调度主循环。 - text_news = await asyncio.to_thread(News().get_baidu_news) - image_url = await asyncio.to_thread(News().get_news_60s) + # 新闻抓取逻辑内聚在插件内,避免依赖外部业务模块。 + text_news = await asyncio.to_thread(self._get_baidu_news) + image_url = await asyncio.to_thread(self._get_news_60s_image) except Exception as e: return {"success": False, "summary": f"新闻抓取失败: {e}", "detail": {"error": str(e)}} @@ -151,3 +151,41 @@ class DailyNewsPlugin(MessagePluginInterface): resp = requests.get(url, timeout=15) resp.raise_for_status() return base64.b64encode(resp.content).decode("utf-8") + + @staticmethod + def _get_baidu_news() -> str: + """获取百度热榜文本(插件内实现)。""" + headers = { + "User-Agent": ( + "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) " + "Gecko/20100101 Firefox/110.0" + ) + } + url = "https://top.baidu.com/api/board?platform=wise&tab=realtime" + now = datetime.now() + current_date = now.strftime("%Y年%m月%d日") + weekdays = ["星期一", "星期二", "星期三", "星期四", "星期五", "星期六", "星期日"] + output = f"当前日期:{current_date} {weekdays[now.weekday()]}\n\n" + + resp = requests.get(url, headers=headers, timeout=15) + resp.raise_for_status() + post = resp.json() + cards = post.get("data", {}).get("cards", []) + index = 1 + for card in cards: + for block in card.get("content", []): + for article in block.get("content", []): + if isinstance(article, dict) and "word" in article: + title = str(article.get("word", "")).strip().replace(" ", "_") + output += f"{index} :#{title}\n" + index += 1 + return output + + @staticmethod + def _get_news_60s_image() -> Optional[str]: + """获取 60s 新闻图片地址(插件内实现)。""" + api_url = "http://192.168.2.32:4399/v2/60s" + resp = requests.get(api_url, timeout=15) + resp.raise_for_status() + data = resp.json() + return (data or {}).get("data", {}).get("image") diff --git a/plugins/daily_ranking/main.py b/plugins/daily_ranking/main.py index 3d3c5da..17c4da0 100644 --- a/plugins/daily_ranking/main.py +++ b/plugins/daily_ranking/main.py @@ -1,10 +1,13 @@ # -*- coding: utf-8 -*- +from datetime import datetime, timedelta from typing import Any, Dict, List, Optional, Tuple from base.plugin_common.message_plugin_interface import MessagePluginInterface from base.plugin_common.plugin_interface import PluginStatus +from db.message_storage import MessageStorageDB +from db.points_db import PointSource, PointsDBOperator from utils.robot_cmd.robot_command import GroupBotManager -from utils.wechat.message_to_db import MessageStorage +from utils.wechat.contact_manager import ContactManager class DailyRankingPlugin(MessagePluginInterface): @@ -44,11 +47,16 @@ class DailyRankingPlugin(MessagePluginInterface): def __init__(self): super().__init__() self.feature = self.register_feature() - self.message_storage: Optional[MessageStorage] = None + self.message_db: Optional[MessageStorageDB] = None + self.points_db: Optional[PointsDBOperator] = None def initialize(self, context: Dict[str, Any]) -> bool: - # 与历史系统逻辑保持一致,直接复用 MessageStorage 的排行生成能力。 - self.message_storage = MessageStorage() + db_manager = context.get("db_manager") + if db_manager is None: + return False + # 排行业务逻辑下沉到插件内,仅复用 DB Operator 作为数据访问层。 + self.message_db = MessageStorageDB(db_manager) + self.points_db = PointsDBOperator(db_manager) return True def start(self) -> bool: @@ -89,8 +97,8 @@ class DailyRankingPlugin(MessagePluginInterface): } if not self.bot: return {"success": False, "summary": "bot 未注入", "detail": {}} - if not self.message_storage: - return {"success": False, "summary": "message_storage 未初始化", "detail": {}} + if not self.message_db or not self.points_db: + return {"success": False, "summary": "排行依赖未初始化", "detail": {}} target_groups = [str(g).strip() for g in (context.get("target_groups") or []) if str(g).strip()] if not target_groups: @@ -105,7 +113,7 @@ class DailyRankingPlugin(MessagePluginInterface): failed_groups = {} for gid in target_groups: try: - ok, text = await self.message_storage.generate_and_send_ranking(gid, {}) + ok, text = await self._generate_and_send_ranking(gid) if ok and text: await self.bot.send_text_message(gid, text) success_groups.append(gid) @@ -121,3 +129,48 @@ class DailyRankingPlugin(MessagePluginInterface): "failed_groups": failed_groups, }, } + + async def _generate_and_send_ranking(self, group_id: str) -> Tuple[bool, str]: + """生成并奖励发言排行(插件内实现)。""" + if not self.message_db or not self.points_db: + return False, "排行依赖未初始化" + + yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d") + rows = self.message_db.get_speech_ranking(yesterday, group_id, limit=20) + if not rows: + return False, f"📊 {yesterday} 没有发言记录" + + contact_manager = ContactManager.get_instance() + ranking_lines = [f"🏆 {yesterday} 发言排行榜 🏆"] + for rank, row in enumerate(rows, start=1): + wxid = row.get("wx_id") + speech_count = int(row.get("speech_count") or 0) + display_name = contact_manager.get_group_name(group_id, wxid) or wxid + + reward = 0 + if rank == 1: + reward = 30 + ranking_lines.append(f"🥇🐲 {rank}.{display_name}: {speech_count}次 🔥 +{reward}积分") + elif rank == 2: + reward = 20 + ranking_lines.append(f"🥈 {rank}.{display_name}: {speech_count}次 ✨ +{reward}积分") + elif rank == 3: + reward = 10 + ranking_lines.append(f"🥉 {rank}.{display_name}: {speech_count}次 👏 +{reward}积分") + elif rank <= 10: + reward = 5 + ranking_lines.append(f"🌟 {rank}.{display_name}: {speech_count}次 +{reward}积分") + else: + reward = 3 + ranking_lines.append(f"👍 {rank}.{display_name}: {speech_count}次 +{reward}积分") + + if reward > 0: + self.points_db.add_points( + wxid, + group_id, + reward, + PointSource.OTHER, + f"{yesterday}发言排行第{rank}名奖励", + ) + + return True, "\n".join(ranking_lines) diff --git a/plugins/epic_free/main.py b/plugins/epic_free/main.py index a9ef96d..00388d9 100644 --- a/plugins/epic_free/main.py +++ b/plugins/epic_free/main.py @@ -1,7 +1,10 @@ # -*- coding: utf-8 -*- +from datetime import datetime from typing import Any, Dict, List, Optional, Tuple -from base.func_epic import get_free, is_friday +import requests +from bs4 import BeautifulSoup + from base.plugin_common.message_plugin_interface import MessagePluginInterface from base.plugin_common.plugin_interface import PluginStatus from utils.robot_cmd.robot_command import GroupBotManager @@ -89,7 +92,7 @@ class EpicFreePlugin(MessagePluginInterface): payload = context.get("payload") or {} force = bool(payload.get("force", False)) - if not force and not is_friday(): + if not force and not self._is_friday(): # 非周五时默认跳过;手动触发可通过 payload.force 强制执行。 return {"success": True, "summary": "今天不是周五,已跳过 Epic 播报", "detail": {"skipped": True}} @@ -103,7 +106,7 @@ class EpicFreePlugin(MessagePluginInterface): return {"success": False, "summary": "没有可推送目标群", "detail": {"target_count": 0}} try: - text = get_free() + text = self._get_free_games() except Exception as e: return {"success": False, "summary": f"获取 Epic 免费游戏失败: {e}", "detail": {"error": str(e)}} @@ -126,3 +129,58 @@ class EpicFreePlugin(MessagePluginInterface): "force": force, }, } + + @staticmethod + def _is_friday() -> bool: + """判断是否周五(插件内实现)。""" + return datetime.today().weekday() == 4 + + @staticmethod + def _get_free_games() -> str: + """抓取 Epic 免费游戏列表(插件内实现)。""" + url = "https://steamstats.cn/xi" + headers = { + "User-Agent": ( + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 " + "(KHTML, like Gecko) Chrome/90.0.4430.72 Safari/537.36 Edg/90.0.818.41" + ) + } + resp = requests.get(url, headers=headers, timeout=20) + resp.raise_for_status() + resp.encoding = resp.apparent_encoding + soup = BeautifulSoup(resp.text, "html.parser") + text = "今日喜加一 :https://store.epicgames.com/en-US/free-games\n" + + tbody = soup.find("tbody") + if not tbody: + return text + "未抓取到免费游戏列表" + + rows = tbody.find_all("tr") + idx = 1 + for row in rows: + cols = row.find_all("td") + if len(cols) < 7: + continue + name = (cols[1].string or "").strip() + gametype = (cols[2].string or "").replace(" ", "").strip() + start = (cols[3].string or "").replace(" ", "").strip() + end = (cols[4].string or "").replace(" ", "").strip() + permanent = (cols[5].string or "").replace(" ", "").strip() + origin_span = cols[6].find("span") + origin = (origin_span.string or "").replace(" ", "").strip() if origin_span else "" + href_value = "" + for a in cols[6].find_all("a"): + href_value = a.get("href", "") or href_value + + text += ( + f"序号:{idx}\n" + f"游戏名称:{name}\n" + f"DLC/game:{gametype}\n" + f"开始时间:{start}\n" + f"结束时间:{end}\n" + f"是否永久:{permanent}\n" + f"平台:{origin}\n" + f"URL:{href_value}\n" + ) + idx += 1 + return text diff --git a/plugins/sehuatang_push/main.py b/plugins/sehuatang_push/main.py index a69e086..22ca586 100644 --- a/plugins/sehuatang_push/main.py +++ b/plugins/sehuatang_push/main.py @@ -4,8 +4,8 @@ from typing import Any, Dict, List, Optional, Tuple from base.plugin_common.message_plugin_interface import MessagePluginInterface from base.plugin_common.plugin_interface import PluginStatus -from utils.sehuatang.shehuatang import pdf_file_path -from utils.sehuatang.shehuatang_undetected import pdf_file_path_undetected +from plugins.sehuatang_push.shehuatang import pdf_file_path +from plugins.sehuatang_push.shehuatang_undetected import pdf_file_path_undetected class SehuatangPushPlugin(MessagePluginInterface): diff --git a/plugins/sehuatang_push/shehuatang.py b/plugins/sehuatang_push/shehuatang.py new file mode 100644 index 0000000..f360c17 --- /dev/null +++ b/plugins/sehuatang_push/shehuatang.py @@ -0,0 +1,311 @@ +import time +import os +import requests +from io import BytesIO +from selenium import webdriver +from selenium.webdriver.chrome.service import Service +from selenium.webdriver.common.by import By +from selenium.webdriver.chrome.options import Options +from selenium.webdriver.support.ui import WebDriverWait +from selenium.webdriver.support import expected_conditions as EC +from webdriver_manager.chrome import ChromeDriverManager +from bs4 import BeautifulSoup +from reportlab.lib.pagesizes import letter, A3 +from reportlab.lib import colors +from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Image, PageBreak +from reportlab.lib.styles import getSampleStyleSheet +from reportlab.pdfbase.ttfonts import TTFont +from reportlab.pdfbase import pdfmetrics +from datetime import datetime +from PIL import Image as PILImage +import re +from PyPDF2 import PdfReader, PdfWriter + +from loguru import logger + + +# download_image 函数保持不变 +def download_image(url): + """下载大于100KB的图片并返回临时文件路径,仅支持jpg、jpeg和png格式""" + try: + if not url.lower().endswith(('.jpg', '.jpeg', '.png')): + return None + + headers = { + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', + 'Referer': 'https://tu.a7nz4.us', + } + + response = requests.get(url, headers=headers) + response.raise_for_status() + image = BytesIO(response.content) + return image + except requests.exceptions.RequestException as e: + logger.warning(f"下载图片失败: {e}") + return None + + +def fetch_and_create_pdf(url): + """根据给定URL抓取页面并生成PDF""" + driver = None + try: + # 配置Selenium + options = Options() + options.add_argument('--headless') # 使用新的headless模式 + options.add_argument('--disable-gpu') + options.add_argument('--no-sandbox') + options.add_argument('--disable-dev-shm-usage') # 添加Linux特定配置 + options.add_argument('--disable-logging') + options.add_argument('--log-level=3') + options.add_experimental_option('excludeSwitches', ['enable-automation', 'enable-logging']) + + # 根据操作系统选择不同的ChromeDriver路径处理方式 + if os.name == 'nt': # Windows + chrome_driver_path = os.path.join( + os.path.dirname(os.path.dirname(os.path.abspath(__file__))), + "utils", "chromedriver", "chromedriver.exe" + ) + else: # Linux + chrome_driver_path = '/usr/bin/chromedriver' # 使用系统PATH中的chromedriver + + try: + if os.name == 'nt' and not os.path.exists(chrome_driver_path): + chrome_driver_path = ChromeDriverManager().install() + service = Service(chrome_driver_path, log_path=os.devnull) + driver = webdriver.Chrome(service=service, options=options) + except Exception as e: + logger.debug(f"初始化ChromeDriver失败: {e}") + chrome_driver_path = ChromeDriverManager().install() + service = Service(chrome_driver_path, log_path=os.devnull) + driver = webdriver.Chrome(service=service, options=options) + + # 获取目标页面 + driver.get(url) + try: + enter_button = WebDriverWait(driver, 5).until( + EC.element_to_be_clickable((By.XPATH, '//a[contains(text(), "满18岁,请点此进入")]'))) + enter_button.click() + logger.debug("点击了满18岁按钮") + except Exception as e: + logger.warning(f"未找到满18岁按钮,跳过此步骤: {e}") + WebDriverWait(driver, 10).until( + EC.presence_of_all_elements_located((By.CSS_SELECTOR, 'tbody[id^="normalthread"]'))) + + # 处理年龄验证按钮 + try: + enter_button = driver.find_element(By.XPATH, '//a[contains(text(), "满18岁,请点此进入")]') + enter_button.click() + logger.debug("点击了满18岁按钮") + time.sleep(5) + except Exception as e: + logger.warning(f"未找到满18岁按钮,跳过此步骤: {e}") + + # 解析页面 + html = driver.page_source + soup = BeautifulSoup(html, 'html.parser', from_encoding='utf-8') + posts = soup.find_all('tbody', {'id': lambda x: x and x.startswith('normalthread')}) + + # 获取今天的日期 + today = datetime.now().strftime('%Y-%m-%d') + + # 注册中文字体 + pdfmetrics.registerFont(TTFont('SimHei', 'fonts/simhei.ttf')) + styles = getSampleStyleSheet() + + # 设置样式 + title_style = styles['Heading1'] + title_style.fontName = 'SimHei' + title_style.fontSize = 14 + title_style.textColor = colors.red + title_style.bold = True + + normal_style = styles['Normal'] + normal_style.fontName = 'SimHei' + normal_style.fontSize = 14 + + content = [] + + # 过滤当天帖子并倒序 + today_posts = [] + for post in posts: + post_time_span = post.find('span', {'class': 'xi1'}) + if post_time_span: + today_posts.append(post) + today_posts = today_posts[::-1] # 倒序处理 + + # 设置PDF - 保存到 temp/JAV 目录 + base_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + pdf_filename = os.path.join(base_dir, 'temp', 'JAV', f"JAV-{today}-{len(today_posts)}.pdf") + + # 确保目录存在 + pdf_dir = os.path.dirname(pdf_filename) + if not os.path.exists(pdf_dir): + os.makedirs(pdf_dir) + doc = SimpleDocTemplate(pdf_filename, pagesize=A3) + + # 计算内容区域的宽度和高度 + page_width, page_height = A3 + content_width = page_width - doc.rightMargin - doc.leftMargin + content_height = page_height - doc.topMargin - doc.bottomMargin + + # 设置最大图片尺寸,留出一些边距 + max_image_width = content_width * 0.95 + max_image_height = content_height * 0.7 # 留出足够空间给文本和其他元素 + + # 遍历帖子 + session = requests.Session() + session.headers.update({ + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', + 'Referer': 'https://www.sehuatang.net/' + }) + for c in driver.get_cookies(): + try: + session.cookies.set(c['name'], c['value'], domain=c.get('domain'), path=c.get('path', '/')) + except Exception: + session.cookies.set(c['name'], c['value']) + for post in today_posts: + title = post.find('a', {'class': 's xst'}) + if title: + post_title = title.get_text() + post_url = title.get('href') + logger.info(post_title) + + # 获取帖子内容 + post_page_url = 'https://www.sehuatang.net/' + post_url + try: + resp = session.get(post_page_url, timeout=15) + resp.raise_for_status() + post_html = resp.text + except Exception as e: + logger.warning(f"获取帖子内容失败: {e}") + continue + post_soup = BeautifulSoup(post_html, 'html.parser', from_encoding='utf-8') + content_div = post_soup.find('div', {'class': 't_fsz'}) + + if content_div: + # 提取文本和磁力链接 + post_text = content_div.get_text(strip=True) + magnet_links = re.findall(r'magnet:\?[^ \u4e00-\u9fff]+', post_text) + + # 添加标题 + content.append(Paragraph(f" {post_title}", title_style)) + content.append(Spacer(1, 5)) + + # 添加磁力链接 + if magnet_links: + for magnet_link in magnet_links: + content.append(Paragraph(f"
{magnet_link}
", normal_style)) + content.append(Spacer(1, 12)) + + # 添加图片 + image_links = [] + images = content_div.find_all('img') + for img in images: + if img.get('zoomfile') and 'http' in img.get('zoomfile'): + image_links.append(img.get('zoomfile')) + + if image_links: + for img_link in image_links: + image = download_image(img_link) + if image: + try: + # 使用PIL处理图片尺寸 + with PILImage.open(image) as img: + img_width, img_height = img.size + # 计算缩放比例,确保图片适应页面 + scale_width = max_image_width / img_width + scale_height = max_image_height / img_height + scale = min(scale_width, scale_height, 1.0) # 不超过原始大小 + + # 计算新的尺寸 + new_width = img_width * scale + new_height = img_height * scale + + # 重置文件指针 + image.seek(0) + img_stream = BytesIO(image.getvalue()) + + # 添加图片到内容中,使用计算后的尺寸 + content.append(Image(img_stream, width=new_width, height=new_height)) + content.append(Spacer(1, 4)) + logger.debug( + f"处理图片: 原始尺寸 {img_width}x{img_height}, 新尺寸 {new_width}x{new_height}") + except Exception as e: + logger.error(f"处理图片时出错: {e}") + + # 在每个帖子后添加分页符(除了最后一页) + if post != today_posts[-1]: + content.append(PageBreak()) + + # 生成PDF + try: + doc.build(content) + absolute_pdf_path = os.path.abspath(pdf_filename) + logger.info(f"PDF saved as {absolute_pdf_path}") + + # 加密PDF + add_pdf_encryption(absolute_pdf_path) + return absolute_pdf_path + except Exception as e: + logger.error(f"生成PDF时出错: {e}") + # 如果生成失败,返回一个默认路径或空字符串 + return "" + except Exception as e: + logger.error(f"抓取帖子时出错: {e}") + # 如果抓取失败,返回一个默认路径或空字符串 + return "" + finally: + # 确保在所有情况下都关闭driver + if driver: + try: + driver.quit() + logger.debug("Chrome driver已成功关闭") + except Exception as e: + logger.error(f"关闭Chrome driver时出错: {e}") + # 在极端情况下尝试强制结束进程 + try: + import psutil + process = psutil.Process(driver.service.process.pid) + process.terminate() + logger.debug("已强制终止Chrome进程") + except Exception as e2: + logger.error(f"强制终止Chrome进程失败: {e2}") + + +# add_pdf_encryption 和 pdf_file_path 函数保持不变 +def add_pdf_encryption(pdf_file, password="4000"): + """使用PyPDF2为PDF添加加密保护""" + try: + pdf_writer = PdfWriter() + pdf_reader = PdfReader(pdf_file) + for page_num in range(len(pdf_reader.pages)): + pdf_writer.add_page(pdf_reader.pages[page_num]) + pdf_writer.encrypt(password) + with open(pdf_file, "wb") as output_pdf: + pdf_writer.write(output_pdf) + logger.debug(f"PDF加密成功,密码为: {password}") + except Exception as e: + logger.error(f"PDF加密失败: {e}") + + +def pdf_file_path(): + try: + url = 'https://www.sehuatang.net/forum.php?mod=forumdisplay&fid=103&filter=typeid&typeid=481' + pdf_path = fetch_and_create_pdf(url) + if pdf_path: + logger.info(f"返回的PDF文件路径:{pdf_path}") + return True, pdf_path + else: + # 如果生成失败,返回一个默认的PDF路径 + default_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "default.pdf") + logger.info(f"PDF生成失败,返回默认路径: {default_path}") + return False, default_path + except Exception as e: + logger.error(f"生成PDF路径时出错: {e}") + # 返回一个默认路径 + default_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "default.pdf") + return False, default_path + + +if __name__ == "__main__": + pdf_file_path() diff --git a/plugins/sehuatang_push/shehuatang_undetected.py b/plugins/sehuatang_push/shehuatang_undetected.py new file mode 100644 index 0000000..c009e2d --- /dev/null +++ b/plugins/sehuatang_push/shehuatang_undetected.py @@ -0,0 +1,241 @@ +import time +import os +import requests +from io import BytesIO +import undetected_chromedriver as uc + +# 注意:不要禁用析构函数,否则会导致Chrome进程泄漏 +# if os.name == 'nt': +# try: +# uc.Chrome.__del__ = lambda self: None +# except Exception: +# pass +from selenium.webdriver.common.by import By +from selenium.webdriver.support.ui import WebDriverWait +from selenium.webdriver.support import expected_conditions as EC +from bs4 import BeautifulSoup +from reportlab.lib.pagesizes import A3 +from reportlab.lib import colors +from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Image, PageBreak +from reportlab.lib.styles import getSampleStyleSheet +from reportlab.pdfbase.ttfonts import TTFont +from reportlab.pdfbase import pdfmetrics +from datetime import datetime +from PIL import Image as PILImage +import re +from PyPDF2 import PdfReader, PdfWriter +from loguru import logger + + +def download_image(url, session): + """使用同步的 session 下载图片,确保 Cookie 一致""" + try: + if not url.lower().endswith(('.jpg', '.jpeg', '.png')): + return None + response = session.get(url, timeout=15) + response.raise_for_status() + return BytesIO(response.content) + except Exception as e: + logger.warning(f"下载图片失败: {e}") + return None + + +def add_pdf_encryption(pdf_file, password="4000"): + try: + pdf_writer = PdfWriter() + pdf_reader = PdfReader(pdf_file) + for page in pdf_reader.pages: + pdf_writer.add_page(page) + pdf_writer.encrypt(password) + with open(pdf_file, "wb") as output_pdf: + pdf_writer.write(output_pdf) + logger.debug("PDF加密成功") + except Exception as e: + logger.error(f"PDF加密失败: {e}") + + +def fetch_and_create_pdf(url): + driver = None + service = None + try: + options = uc.ChromeOptions() + # 规避检测的关键配置 + # 在Linux服务器上使用headless模式 + if os.name != 'nt': + options.headless = True + options.add_argument('--headless=new') # 使用新版headless模式 + else: + options.headless = False + + options.add_argument('--no-sandbox') + options.add_argument('--disable-gpu') + options.add_argument('--disable-dev-shm-usage') + options.add_argument('--disable-extensions') + options.add_argument('--disable-background-networking') + # 确保进程能被正确清理 + options.add_argument('--disable-crash-reporter') + options.add_argument('--disable-in-process-stack-traces') + options.add_argument('--disable-logging') + options.add_argument('--disable-dev-shm-usage') + + # 创建driver实例 + # 让 undetected_chromedriver 自动检测浏览器版本并下载匹配的 ChromeDriver + # 强制指定版本为144,以匹配服务器当前的 Chrome 版本 + driver = uc.Chrome(options=options, version_main=144) + + logger.info(f"正在访问: {url}") + driver.get(url) + + # 等待 Cloudflare 5秒盾结束,并处理“满18岁”按钮 + time.sleep(8) + + try: + enter_btn = WebDriverWait(driver, 10).until( + EC.element_to_be_clickable((By.XPATH, '//a[contains(text(), "满18岁,请点此进入")]')) + ) + enter_btn.click() + logger.debug("点击了年龄确认按钮") + time.sleep(3) + except Exception: + logger.debug("未发现年龄验证按钮,可能已过检测") + + # 确保列表加载 + WebDriverWait(driver, 20).until( + EC.presence_of_element_located((By.CSS_SELECTOR, 'tbody[id^="normalthread"]')) + ) + + # 提取数据 + soup = BeautifulSoup(driver.page_source, 'html.parser') + posts = [p for p in soup.find_all('tbody', {'id': lambda x: x and x.startswith('normalthread')}) if + p.find('span', {'class': 'xi1'})] + today_posts = posts[::-1] + + # 字体注册 + pdfmetrics.registerFont(TTFont('SimHei', 'fonts/simhei.ttf')) + styles = getSampleStyleSheet() + title_style = styles['Heading1'] + title_style.fontName = 'SimHei' + title_style.textColor = colors.red + normal_style = styles['Normal'] + normal_style.fontName = 'SimHei' + + # 路径逻辑 - 保存到 temp/JAV 目录 + base_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + save_path = os.path.join(base_dir, 'temp', 'JAV') + if not os.path.exists(save_path): + os.makedirs(save_path) + pdf_filename = os.path.join(save_path, f"JAV-{datetime.now().strftime('%Y-%m-%d')}-{len(today_posts)}.pdf") + + doc = SimpleDocTemplate(pdf_filename, pagesize=A3) + content = [] + max_w, max_h = (A3[0] - 72) * 0.95, (A3[1] - 72) * 0.7 + + # 同步 Session + session = requests.Session() + ua = driver.execute_script("return navigator.userAgent") + session.headers.update({'User-Agent': ua, 'Referer': 'https://www.sehuatang.net/'}) + for c in driver.get_cookies(): + session.cookies.set(c['name'], c['value']) + + # 循环帖子 + for post in today_posts: + title_tag = post.find('a', {'class': 's xst'}) + if not title_tag: continue + + p_title = title_tag.get_text() + p_url = 'https://www.sehuatang.net/' + title_tag.get('href') + logger.info(f"详情页: {p_title}") + + try: + resp = session.get(p_url, timeout=15) + p_soup = BeautifulSoup(resp.text, 'html.parser') + div = p_soup.find('div', {'class': 't_fsz'}) + + if div: + content.append(Paragraph(f" {p_title}", title_style)) + magnets = re.findall(r'magnet:\?[^ \u4e00-\u9fff]+', div.get_text()) + for m in magnets: + content.append(Paragraph(f"{m}", normal_style)) + + for img_tag in div.find_all('img'): + src = img_tag.get('zoomfile') + if src and 'http' in src: + img_io = download_image(src, session) + if img_io: + with PILImage.open(img_io) as p_img: + iw, ih = p_img.size + sc = min(max_w / iw, max_h / ih, 1.0) + img_io.seek(0) + content.append(Image(img_io, width=iw * sc, height=ih * sc)) + + if post != today_posts[-1]: content.append(PageBreak()) + except Exception as e: + logger.error(f"帖子处理失败: {e}") + + doc.build(content) + add_pdf_encryption(pdf_filename) + return pdf_filename + + except Exception as e: + logger.exception(f"抓取异常: {e}") + return "" + finally: + # --- 确保Chrome进程被完全关闭 --- + if driver: + try: + logger.debug("正在安全关闭浏览器...") + # 先关闭所有标签页和窗口 + try: + driver.close() + except Exception as e: + logger.warning(f"关闭浏览器窗口时出错: {e}") + + # 强制退出所有Chrome进程 + driver.quit() + logger.debug("浏览器已完全关闭") + except Exception as e: + logger.error(f"关闭浏览器时出错: {e}") + + # 额外保险:强制清理残留的Chrome进程(仅Linux) + if os.name != 'nt': + try: + import psutil + current_user = os.getlogin() + for proc in psutil.process_iter(['pid', 'name', 'cmdline', 'username']): + try: + if proc.info['name'] and 'chrome' in proc.info['name'].lower(): + if proc.info['username'] == current_user: + # 检查是否是本次启动的chrome进程(通过命令行参数判断) + cmdline = proc.info.get('cmdline', []) + if cmdline and any('--user-data-dir=/tmp/playwright' in str(cmd) for cmd in cmdline): + logger.info(f"强制终止残留Chrome进程: PID={proc.info['pid']}") + proc.kill() + except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): + pass + except ImportError: + logger.debug("未安装psutil,跳过强制清理") + except Exception as e: + logger.warning(f"强制清理Chrome进程时出错: {e}") + + +def pdf_file_path_undetected(): + try: + url = 'https://www.sehuatang.net/forum.php?mod=forumdisplay&fid=103&filter=typeid&typeid=481' + pdf_path = fetch_and_create_pdf(url) + if pdf_path: + logger.info(f"返回的PDF文件路径:{pdf_path}") + return True, pdf_path + else: + # 如果生成失败,返回一个默认的PDF路径 + default_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "default.pdf") + logger.info(f"PDF生成失败,返回默认路径: {default_path}") + return False, default_path + except Exception as e: + logger.error(f"生成PDF路径时出错: {e}") + # 返回一个默认路径 + default_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "default.pdf") + return False, default_path + + +if __name__ == "__main__": + pdf_file_path_undetected()