import random from loguru import logger import requests from bs4 import BeautifulSoup import time import os import re from urllib.parse import urljoin from selenium import webdriver from selenium.webdriver.chrome.options import Options from selenium.webdriver.common.by import By from PIL import Image from io import BytesIO headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) Chrome/91.0.4472.124 Safari/537.36', 'Referer': 'https://www.mntuce.com/' } seen_posts = set() download_root = "/mnt/nfs_share" # 全局定义下载根目录 def fetch_posts(base_url, dl_path, posts_per_batch=10): posts = [] page = 1 while len(posts) < posts_per_batch: url = f"{base_url}/page/{page}" if page > 1 else base_url try: response = requests.get(url, headers=headers, timeout=10) response.raise_for_status() soup = BeautifulSoup(response.text, 'html.parser') post_elements = soup.select('posts.posts-item.card h2.item-heading a') if not post_elements: logger.info(f"页面 {page} 未找到帖子,停止爬取") break for post in post_elements: post_url = urljoin(base_url, post.get('href')) post_title = post.get_text().strip() # 检查帖子是否已下载 match = re.search(r'(?:[Nn][Oo]|[Vv][Oo][Ll])\.(\d+)', post_title) # 支持 "No." 或 " folder_name = match.group(1) if match else f"unknown_{len(posts) + 1}" folder_path = os.path.join(dl_path, folder_name) if post_url not in seen_posts: if os.path.exists(folder_path): seen_posts.add(post_url) # 标记为已见过,避免重复检查 continue # 跳过已下载的帖子 seen_posts.add(post_url) posts.append({'title': post_title, 'url': post_url}) if len(posts) == posts_per_batch: break # 凑齐所需数量后退出内层循环 page += 1 time.sleep(1) except requests.RequestException as e: logger.info(f"请求 {url} 失败: {e}") break return posts def get_total_pages(post_url): try: response = requests.get(post_url, headers=headers, timeout=10) response.raise_for_status() soup = BeautifulSoup(response.text, 'html.parser') page_links = soup.select('p.post-nav-links a.post-page-numbers') pages = [int(link.text) for link in page_links if link.text.isdigit()] return max(pages) if pages else 1 except requests.RequestException as e: logger.info(f"请求 {post_url} 失败,默认1页: {e}") return 1 def fetch_images(post_url): driver = None try: images = [] total_pages = get_total_pages(post_url) logger.info(f"帖子 {post_url} 共有 {total_pages} 页") options = Options() options.add_argument('--headless') # 使用新的headless模式 options.add_argument('--disable-gpu') options.add_argument('--disable-dev-shm-usage') # 添加Linux特定配置 # 添加更多参数以确保Chrome进程能够正确退出 options.add_argument('--no-sandbox') options.add_argument('--disable-extensions') options.add_argument('--disable-software-rasterizer') options.add_argument('--remote-debugging-port=0') options.add_experimental_option("excludeSwitches", ["enable-automation"]) options.add_experimental_option('useAutomationExtension', False) options.headless = True # 根据操作系统选择不同的ChromeDriver路径处理方式 if os.name == 'nt': # Windows chrome_driver_path = os.path.join( os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "utils", "chromedriver", "chromedriver.exe" ) else: # Linux chrome_driver_path = '/usr/bin/chromedriver' # 使用系统PATH中的chromedriver # 如果本地没有chromedriver.exe,则使用默认方式 if not os.path.exists(chrome_driver_path): driver = webdriver.Chrome(options=options) logger.debug("使用默认ChromeDriver") else: from selenium.webdriver.chrome.service import Service driver = webdriver.Chrome(service=Service(chrome_driver_path), options=options) logger.debug(f"使用本地ChromeDriver: {chrome_driver_path}") for page in range(1, total_pages + 1): url = f"{post_url}/{page}" if page > 1 else post_url driver.get(url) time.sleep(2) img_elements = driver.find_elements(By.CSS_SELECTOR, 'figure.wp-block-gallery figure.wp-block-image img') for img in img_elements: img_url = img.get_attribute('src') if img_url and img_url.startswith('http'): images.append(img_url) logger.info(f"已爬取 {url},找到 {len(img_elements)} 张图片") # 不在这里调用quit,统一在finally中处理 return images except Exception as e: logger.info(f"爬取 {post_url} 失败: {e}") return [] finally: # 确保在所有情况下都能正确关闭Chrome浏览器 try: if driver is not None: driver.quit() logger.debug("Chrome浏览器已正常关闭") except Exception as e: logger.error(f"关闭Chrome浏览器时出错: {e}") # 在极端情况下,尝试使用系统命令强制终止Chrome进程 try: if os.name == 'nt': # Windows os.system('taskkill /f /im chrome.exe /t') else: # Linux os.system('pkill -f chrome') logger.warning("已尝试强制终止Chrome进程") except Exception as kill_error: logger.error(f"强制终止Chrome进程失败: {kill_error}") def download_image(img_url, folder_path, img_index, max_retries=3): for attempt in range(max_retries): try: # 构建特定的headers local_headers = headers.copy() local_headers['Referer'] = img_url # 使用图片URL作为referer # 添加一些额外的headers模拟真实浏览器 local_headers.update({ 'Accept': 'image/avif,image/webp,image/apng,image/*,*/*;q=0.8', 'Accept-Encoding': 'gzip, deflate, br', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', 'Cache-Control': 'no-cache', 'Pragma': 'no-cache' }) response = requests.get(img_url, headers=local_headers, timeout=10) response.raise_for_status() # 验证内容类型 content_type = response.headers.get('content-type', '') if not content_type.startswith('image/'): logger.info(f"尝试 {attempt + 1}/{max_retries}: 返回内容不是图片类型 ({content_type}),等待后重试...") time.sleep(2 * (attempt + 1)) continue with Image.open(BytesIO(response.content)).convert('RGB') as img: img_name = f"{img_index:03d}.jpg" img_path = os.path.join(folder_path, img_name) img.save(img_path, 'JPEG', quality=95) logger.info(f"已下载并转换为JPG: {img_path}") return True except Exception as e: logger.warning(f"尝试 {attempt + 1}/{max_retries} 下载图片失败: {e}") if attempt < max_retries - 1: wait_time = random.uniform(2, 5) * (attempt + 1) # 随机递增等待时间 logger.info(f"等待 {wait_time:.1f} 秒后重试...") time.sleep(wait_time) continue logger.warning(f"图片 {img_url} 下载失败,已达到最大重试次数") return False def meitu_dowload_pic(dl_path, dl_url): base_url = dl_url if not os.path.exists(dl_path): os.makedirs(dl_path) logger.info(f"开始爬取 {base_url} 的帖子...") posts = fetch_posts(base_url, dl_path, 10) if not posts: logger.info("未获取到符合条件的帖子,请检查选择器或网络连接。") return logger.info(f"成功选择 {len(posts)} 个未下载的帖子,开始下载图片...") for i, post in enumerate(posts, 1): logger.info(f"\n{i}. 标题: {post['title']}") logger.info(f" 链接: {post['url']}") match = re.search(r'(?:[Nn][Oo]|[Vv][Oo][Ll])\.(\d+)', post['title']) # 支持 "No." 或 " folder_name = match.group(1) if match else post['title'] folder_path = os.path.join(dl_path, folder_name) os.makedirs(folder_path, exist_ok=True) # 创建目录,exist_ok=True 避免重复创建报错 images = fetch_images(post['url']) if images: logger.info(f"共找到 {len(images)} 张图片,开始下载...") for idx, img_url in enumerate(images, 1): # 增加随机延时 if not download_image(img_url, folder_path, idx): logger.info(f"图片 {img_url} 下载失败,继续下一张...") continue # 每个帖子之间增加随机延时 time.sleep(random.uniform(3, 6)) return download_root def meitu_dowload_pub_pic(): meitu_dowload_pic(download_root, "https://www.mntuce.com/") if __name__ == "__main__": meitu_dowload_pub_pic()