优化内容

This commit is contained in:
liuwei
2025-05-20 16:10:48 +08:00
parent e6af84cd1e
commit 55d95c11be
3 changed files with 2 additions and 3 deletions

View File

@@ -0,0 +1,216 @@
import random
from loguru import logger
import requests
from bs4 import BeautifulSoup
import time
import os
import re
from urllib.parse import urljoin
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from PIL import Image
from io import BytesIO
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) Chrome/91.0.4472.124 Safari/537.36',
'Referer': 'https://www.mntuce.com/'
}
seen_posts = set()
download_root = "/mnt/nfs_share" # 全局定义下载根目录
def fetch_posts(base_url,dl_path, posts_per_batch=10):
posts = []
page = 1
while len(posts) < posts_per_batch:
url = f"{base_url}/page/{page}" if page > 1 else base_url
try:
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
post_elements = soup.select('posts.posts-item.card h2.item-heading a')
if not post_elements:
logger.info(f"页面 {page} 未找到帖子,停止爬取")
break
for post in post_elements:
post_url = urljoin(base_url, post.get('href'))
post_title = post.get_text().strip()
# 检查帖子是否已下载
match = re.search(r'(?:[Nn][Oo]|[Vv][Oo][Ll])\.(\d+)', post_title) # 支持 "No." 或 "
folder_name = match.group(1) if match else f"unknown_{len(posts) + 1}"
folder_path = os.path.join(dl_path, folder_name)
if post_url not in seen_posts:
if os.path.exists(folder_path):
seen_posts.add(post_url) # 标记为已见过,避免重复检查
continue # 跳过已下载的帖子
seen_posts.add(post_url)
posts.append({'title': post_title, 'url': post_url})
if len(posts) == posts_per_batch:
break # 凑齐所需数量后退出内层循环
page += 1
time.sleep(1)
except requests.RequestException as e:
logger.info(f"请求 {url} 失败: {e}")
break
return posts
def get_total_pages(post_url):
try:
response = requests.get(post_url, headers=headers, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
page_links = soup.select('p.post-nav-links a.post-page-numbers')
pages = [int(link.text) for link in page_links if link.text.isdigit()]
return max(pages) if pages else 1
except requests.RequestException as e:
logger.info(f"请求 {post_url} 失败默认1页: {e}")
return 1
def fetch_images(post_url):
images = []
total_pages = get_total_pages(post_url)
logger.info(f"帖子 {post_url} 共有 {total_pages}")
options = Options()
options.add_argument('--headless') # 使用新的headless模式
options.add_argument('--disable-gpu')
options.add_argument('--disable-dev-shm-usage') # 添加Linux特定配置
options.headless = True
# 根据操作系统选择不同的ChromeDriver路径处理方式
if os.name == 'nt': # Windows
chrome_driver_path = os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
"utils", "chromedriver", "chromedriver.exe"
)
else: # Linux
chrome_driver_path = '/usr/bin/chromedriver' # 使用系统PATH中的chromedriver
# 如果本地没有chromedriver.exe则使用默认方式
if not os.path.exists(chrome_driver_path):
driver = webdriver.Chrome(options=options)
logger.debug("使用默认ChromeDriver")
else:
from selenium.webdriver.chrome.service import Service
driver = webdriver.Chrome(service=Service(chrome_driver_path), options=options)
logger.debug(f"使用本地ChromeDriver: {chrome_driver_path}")
for page in range(1, total_pages + 1):
url = f"{post_url}/{page}" if page > 1 else post_url
driver.get(url)
time.sleep(2)
img_elements = driver.find_elements(By.CSS_SELECTOR, 'figure.wp-block-gallery figure.wp-block-image img')
for img in img_elements:
img_url = img.get_attribute('src')
if img_url and img_url.startswith('http'):
images.append(img_url)
logger.info(f"已爬取 {url},找到 {len(img_elements)} 张图片")
driver.quit()
return images
def download_image(img_url, folder_path, img_index, max_retries=3):
for attempt in range(max_retries):
try:
# 构建特定的headers
local_headers = headers.copy()
local_headers['Referer'] = img_url # 使用图片URL作为referer
# 添加一些额外的headers模拟真实浏览器
local_headers.update({
'Accept': 'image/avif,image/webp,image/apng,image/*,*/*;q=0.8',
'Accept-Encoding': 'gzip, deflate, br',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Cache-Control': 'no-cache',
'Pragma': 'no-cache'
})
response = requests.get(img_url, headers=local_headers, timeout=10)
response.raise_for_status()
# 验证内容类型
content_type = response.headers.get('content-type', '')
if not content_type.startswith('image/'):
logger.info(f"尝试 {attempt + 1}/{max_retries}: 返回内容不是图片类型 ({content_type}),等待后重试...")
time.sleep(2 * (attempt + 1))
continue
img = Image.open(BytesIO(response.content)).convert('RGB')
img_name = f"{img_index:03d}.jpg"
img_path = os.path.join(folder_path, img_name)
img.save(img_path, 'JPEG', quality=95)
logger.info(f"已下载并转换为JPG: {img_path}")
return True
except Exception as e:
logger.warning(f"尝试 {attempt + 1}/{max_retries} 下载图片失败: {e}")
if attempt < max_retries - 1:
wait_time = random.uniform(2, 5) * (attempt + 1) # 随机递增等待时间
logger.info(f"等待 {wait_time:.1f} 秒后重试...")
time.sleep(wait_time)
continue
logger.warning(f"图片 {img_url} 下载失败,已达到最大重试次数")
return False
def meitu_dowload_pic(dl_path, dl_url):
base_url = dl_url
if not os.path.exists(dl_path):
os.makedirs(dl_path)
logger.info(f"开始爬取 {base_url} 的帖子...")
posts = fetch_posts(base_url,dl_path, 10)
if not posts:
logger.info("未获取到符合条件的帖子,请检查选择器或网络连接。")
return
logger.info(f"成功选择 {len(posts)} 个未下载的帖子,开始下载图片...")
for i, post in enumerate(posts, 1):
logger.info(f"\n{i}. 标题: {post['title']}")
logger.info(f" 链接: {post['url']}")
match = re.search(r'(?:[Nn][Oo]|[Vv][Oo][Ll])\.(\d+)', post['title']) # 支持 "No." 或 "
folder_name = match.group(1) if match else post['title']
folder_path = os.path.join(dl_path, folder_name)
os.makedirs(folder_path, exist_ok=True) # 创建目录exist_ok=True 避免重复创建报错
images = fetch_images(post['url'])
if images:
logger.info(f"共找到 {len(images)} 张图片,开始下载...")
for idx, img_url in enumerate(images, 1):
# 增加随机延时
if not download_image(img_url, folder_path, idx):
logger.info(f"图片 {img_url} 下载失败,继续下一张...")
continue
# 每个帖子之间增加随机延时
time.sleep(random.uniform(3, 6))
return download_root
def meitu_dowload_pub_pic():
meitu_dowload_pic(download_root, "https://www.mntuce.com/")
if __name__ == "__main__":
meitu_dowload_pub_pic()

View File

@@ -0,0 +1,164 @@
import os
from reportlab.lib.pagesizes import A3
from reportlab.platypus import SimpleDocTemplate, Image
from PyPDF2 import PdfReader, PdfWriter
from PIL import Image as PILImage
import io
def compress_image(image_path, target_size_kb=300):
"""快速压缩图片到目标大小单位KB"""
img = PILImage.open(image_path)
# 如果图片有透明度转换为RGB
if img.mode in ('RGBA', 'P'):
img = img.convert('RGB')
# 获取原始文件大小KB
original_size_kb = os.path.getsize(image_path) / 1024
# 如果原始大小已小于目标大小,直接返回原始数据
if original_size_kb <= target_size_kb:
output = io.BytesIO()
img.save(output, format='JPEG', quality=85)
return output.getvalue()
# 根据原始大小和目标大小预估初始质量
# 假设质量与文件大小近似线性关系,设置一个初始值
estimated_quality = min(95, max(10, int(85 * (target_size_kb / original_size_kb))))
output = io.BytesIO()
img.save(output, format='JPEG', quality=estimated_quality)
size_kb = len(output.getvalue()) / 1024
# 如果预估结果偏差较大使用二分法调整最多尝试3次
low, high = 10, 95
for _ in range(3):
if size_kb <= target_size_kb * 0.9 or size_kb >= target_size_kb * 1.1: # 允许±10%偏差
quality = (low + high) // 2
output.seek(0)
output.truncate(0)
img.save(output, format='JPEG', quality=quality)
size_kb = len(output.getvalue()) / 1024
if size_kb > target_size_kb:
high = quality - 1
else:
low = quality + 1
else:
break
return output.getvalue()
def create_pdf_from_images(directory, output_pdf):
"""从目录中的图片创建PDF"""
image_extensions = {'.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', '.webp'}
image_files = []
# 遍历目录,筛选图片文件
for root, dirs, files in os.walk(directory):
for file in files:
_, ext = os.path.splitext(file)
if ext.lower() in image_extensions:
image_files.append(os.path.join(root, file))
# 如果没有找到图片,返回
if not image_files:
print(f"未在 {directory} 中找到图片文件。")
return
# 按文件名排序
image_files.sort()
# 设置PDF文档使用A3页面大小
doc = SimpleDocTemplate(output_pdf, pagesize=A3)
# 创建图片列表
image_list = []
for image_file in image_files:
# 使用Pillow获取图片尺寸
with PILImage.open(image_file) as img:
img_width, img_height = img.size
# 压缩图片到~300KB
compressed_image_data = compress_image(image_file)
# 从压缩字节数据中读取图片以获取新尺寸
compressed_img = PILImage.open(io.BytesIO(compressed_image_data))
img_width, img_height = compressed_img.size
# 设置目标宽度适应A3页面宽度最大800点
target_width = 800
# 计算新高度,保持宽高比
target_height = int(img_height * (target_width / img_width))
# 限制最大高度,避免超出页面
max_page_height = 1091 - 100 # A3页面高度1191点留出空间
if target_height > max_page_height:
target_height = max_page_height
target_width = int(target_height * (img_width / img_height))
# 创建图片对象并添加到列表
img = Image(io.BytesIO(compressed_image_data), width=target_width, height=target_height)
image_list.append(img)
# 生成PDF
doc.build(image_list)
print(f"PDF {output_pdf} 创建成功。")
def encrypt_pdf(input_pdf, output_pdf, password):
"""加密PDF文件"""
writer = PdfWriter()
with open(input_pdf, "rb") as file:
reader = PdfReader(file)
for page in reader.pages:
writer.add_page(page)
with open(output_pdf, "wb") as file:
writer.encrypt(password)
writer.write(file)
print(f"PDF {output_pdf} 加密成功。")
def generate_pdf_from_images(directory):
"""从数字命名的文件夹生成PDF"""
# 获取目录下所有数字命名的文件夹
folder_names = [folder for folder in os.listdir(directory)
if os.path.isdir(os.path.join(directory, folder)) and folder.isdigit()]
# 如果没有数字命名的文件夹,返回
if not folder_names:
print("未找到数字命名的文件夹。")
return
# 循环处理每个文件夹
for folder_name in folder_names:
folder_path = os.path.join(directory, folder_name)
# 设置PDF输出路径
output_pdf = f"./PDF/{folder_name}.pdf"
# 如果PDF已存在跳过
if os.path.exists(output_pdf):
print(f"PDF {output_pdf} 已存在,跳过...")
continue
# 创建PDF目录
os.makedirs("../../xiuren/PDF", exist_ok=True)
# 创建PDF
create_pdf_from_images(folder_path, output_pdf)
# 加密PDF
encrypt_pdf(output_pdf, output_pdf, "4000") # 密码为4000
# 返回PDF绝对路径
return os.path.abspath(output_pdf)
# 示例用法
if __name__ == "__main__":
generate_pdf_from_images("./")