Files
abot/plugins/xiuren_image/meitu_dl.py

245 lines
9.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import random
from loguru import logger
import requests
from bs4 import BeautifulSoup
import time
import os
import re
from urllib.parse import urljoin
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from PIL import Image
from io import BytesIO
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) Chrome/91.0.4472.124 Safari/537.36',
'Referer': 'https://www.mntuce.com/'
}
seen_posts = set()
download_root = "/mnt/nfs_share" # 全局定义下载根目录
def fetch_posts(base_url, dl_path, posts_per_batch=10):
posts = []
page = 1
while len(posts) < posts_per_batch:
url = f"{base_url}/page/{page}" if page > 1 else base_url
try:
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
post_elements = soup.select('posts.posts-item.card h2.item-heading a')
if not post_elements:
logger.info(f"页面 {page} 未找到帖子,停止爬取")
break
for post in post_elements:
post_url = urljoin(base_url, post.get('href'))
post_title = post.get_text().strip()
# 检查帖子是否已下载
match = re.search(r'(?:[Nn][Oo]|[Vv][Oo][Ll])\.(\d+)', post_title) # 支持 "No." 或 "
folder_name = match.group(1) if match else f"unknown_{len(posts) + 1}"
folder_path = os.path.join(dl_path, folder_name)
if post_url not in seen_posts:
if os.path.exists(folder_path):
seen_posts.add(post_url) # 标记为已见过,避免重复检查
continue # 跳过已下载的帖子
seen_posts.add(post_url)
posts.append({'title': post_title, 'url': post_url})
if len(posts) == posts_per_batch:
break # 凑齐所需数量后退出内层循环
page += 1
time.sleep(1)
except requests.RequestException as e:
logger.info(f"请求 {url} 失败: {e}")
break
return posts
def get_total_pages(post_url):
try:
response = requests.get(post_url, headers=headers, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
page_links = soup.select('p.post-nav-links a.post-page-numbers')
pages = [int(link.text) for link in page_links if link.text.isdigit()]
return max(pages) if pages else 1
except requests.RequestException as e:
logger.info(f"请求 {post_url} 失败默认1页: {e}")
return 1
def fetch_images(post_url):
driver = None
try:
images = []
total_pages = get_total_pages(post_url)
logger.info(f"帖子 {post_url} 共有 {total_pages}")
options = Options()
options.add_argument('--headless') # 使用新的headless模式
options.add_argument('--disable-gpu')
options.add_argument('--disable-dev-shm-usage') # 添加Linux特定配置
# 添加更多参数以确保Chrome进程能够正确退出
options.add_argument('--no-sandbox')
options.add_argument('--disable-extensions')
options.add_argument('--disable-software-rasterizer')
options.add_argument('--remote-debugging-port=0')
options.add_experimental_option("excludeSwitches", ["enable-automation"])
options.add_experimental_option('useAutomationExtension', False)
options.headless = True
# 根据操作系统选择不同的ChromeDriver路径处理方式
if os.name == 'nt': # Windows
chrome_driver_path = os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
"utils", "chromedriver", "chromedriver.exe"
)
else: # Linux
chrome_driver_path = '/usr/bin/chromedriver' # 使用系统PATH中的chromedriver
# 如果本地没有chromedriver.exe则使用默认方式
if not os.path.exists(chrome_driver_path):
driver = webdriver.Chrome(options=options)
logger.debug("使用默认ChromeDriver")
else:
from selenium.webdriver.chrome.service import Service
driver = webdriver.Chrome(service=Service(chrome_driver_path), options=options)
logger.debug(f"使用本地ChromeDriver: {chrome_driver_path}")
for page in range(1, total_pages + 1):
url = f"{post_url}/{page}" if page > 1 else post_url
driver.get(url)
time.sleep(2)
img_elements = driver.find_elements(By.CSS_SELECTOR, 'figure.wp-block-gallery figure.wp-block-image img')
for img in img_elements:
img_url = img.get_attribute('src')
if img_url and img_url.startswith('http'):
images.append(img_url)
logger.info(f"已爬取 {url},找到 {len(img_elements)} 张图片")
# 不在这里调用quit统一在finally中处理
return images
except Exception as e:
logger.info(f"爬取 {post_url} 失败: {e}")
return []
finally:
# 确保在所有情况下都能正确关闭Chrome浏览器
try:
if driver is not None:
driver.quit()
logger.debug("Chrome浏览器已正常关闭")
except Exception as e:
logger.error(f"关闭Chrome浏览器时出错: {e}")
# 在极端情况下尝试使用系统命令强制终止Chrome进程
try:
if os.name == 'nt': # Windows
os.system('taskkill /f /im chrome.exe /t')
else: # Linux
os.system('pkill -f chrome')
logger.warning("已尝试强制终止Chrome进程")
except Exception as kill_error:
logger.error(f"强制终止Chrome进程失败: {kill_error}")
def download_image(img_url, folder_path, img_index, max_retries=3):
for attempt in range(max_retries):
try:
# 构建特定的headers
local_headers = headers.copy()
local_headers['Referer'] = img_url # 使用图片URL作为referer
# 添加一些额外的headers模拟真实浏览器
local_headers.update({
'Accept': 'image/avif,image/webp,image/apng,image/*,*/*;q=0.8',
'Accept-Encoding': 'gzip, deflate, br',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Cache-Control': 'no-cache',
'Pragma': 'no-cache'
})
response = requests.get(img_url, headers=local_headers, timeout=10)
response.raise_for_status()
# 验证内容类型
content_type = response.headers.get('content-type', '')
if not content_type.startswith('image/'):
logger.info(f"尝试 {attempt + 1}/{max_retries}: 返回内容不是图片类型 ({content_type}),等待后重试...")
time.sleep(2 * (attempt + 1))
continue
with Image.open(BytesIO(response.content)).convert('RGB') as img:
img_name = f"{img_index:03d}.jpg"
img_path = os.path.join(folder_path, img_name)
img.save(img_path, 'JPEG', quality=95)
logger.info(f"已下载并转换为JPG: {img_path}")
return True
except Exception as e:
logger.warning(f"尝试 {attempt + 1}/{max_retries} 下载图片失败: {e}")
if attempt < max_retries - 1:
wait_time = random.uniform(2, 5) * (attempt + 1) # 随机递增等待时间
logger.info(f"等待 {wait_time:.1f} 秒后重试...")
time.sleep(wait_time)
continue
logger.warning(f"图片 {img_url} 下载失败,已达到最大重试次数")
return False
def meitu_dowload_pic(dl_path, dl_url):
base_url = dl_url
if not os.path.exists(dl_path):
os.makedirs(dl_path)
logger.info(f"开始爬取 {base_url} 的帖子...")
posts = fetch_posts(base_url, dl_path, 10)
if not posts:
logger.info("未获取到符合条件的帖子,请检查选择器或网络连接。")
return
logger.info(f"成功选择 {len(posts)} 个未下载的帖子,开始下载图片...")
for i, post in enumerate(posts, 1):
logger.info(f"\n{i}. 标题: {post['title']}")
logger.info(f" 链接: {post['url']}")
match = re.search(r'(?:[Nn][Oo]|[Vv][Oo][Ll])\.(\d+)', post['title']) # 支持 "No." 或 "
folder_name = match.group(1) if match else post['title']
folder_path = os.path.join(dl_path, folder_name)
os.makedirs(folder_path, exist_ok=True) # 创建目录exist_ok=True 避免重复创建报错
images = fetch_images(post['url'])
if images:
logger.info(f"共找到 {len(images)} 张图片,开始下载...")
for idx, img_url in enumerate(images, 1):
# 增加随机延时
if not download_image(img_url, folder_path, idx):
logger.info(f"图片 {img_url} 下载失败,继续下一张...")
continue
# 每个帖子之间增加随机延时
time.sleep(random.uniform(3, 6))
return download_root
def meitu_dowload_pub_pic():
meitu_dowload_pic(download_root, "https://www.mntuce.com/")
if __name__ == "__main__":
meitu_dowload_pub_pic()