import requests from bs4 import BeautifulSoup import time import os import re from urllib.parse import urljoin from selenium import webdriver from selenium.webdriver.chrome.options import Options from selenium.webdriver.common.by import By from PIL import Image from io import BytesIO headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) Chrome/91.0.4472.124 Safari/537.36', 'Referer': 'https://www.mntuce.com/' } seen_posts = set() download_root = "xiuren" # 全局定义下载根目录 download_root_heisi = 'xiuren/heisi' def fetch_posts(base_url,dl_path, posts_per_batch=10): posts = [] page = 1 while len(posts) < posts_per_batch: url = f"{base_url}/page/{page}" if page > 1 else base_url try: response = requests.get(url, headers=headers, timeout=10) response.raise_for_status() soup = BeautifulSoup(response.text, 'html.parser') post_elements = soup.select('posts.posts-item.card h2.item-heading a') if not post_elements: print(f"页面 {page} 未找到帖子,停止爬取") break for post in post_elements: post_url = urljoin(base_url, post.get('href')) post_title = post.get_text().strip() # 检查帖子是否已下载 match = re.search(r'(?:[Nn][Oo]|[Vv][Oo][Ll])\.(\d+)', post_title) # 支持 "No." 或 " folder_name = match.group(1) if match else f"unknown_{len(posts) + 1}" folder_path = os.path.join(dl_path, folder_name) if post_url not in seen_posts: if os.path.exists(folder_path): seen_posts.add(post_url) # 标记为已见过,避免重复检查 continue # 跳过已下载的帖子 seen_posts.add(post_url) posts.append({'title': post_title, 'url': post_url}) if len(posts) == posts_per_batch: break # 凑齐所需数量后退出内层循环 page += 1 time.sleep(1) except requests.RequestException as e: print(f"请求 {url} 失败: {e}") break return posts def get_total_pages(post_url): try: response = requests.get(post_url, headers=headers, timeout=10) response.raise_for_status() soup = BeautifulSoup(response.text, 'html.parser') page_links = soup.select('p.post-nav-links a.post-page-numbers') pages = [int(link.text) for link in page_links if link.text.isdigit()] return max(pages) if pages else 1 except requests.RequestException as e: print(f"请求 {post_url} 失败,默认1页: {e}") return 1 def fetch_images(post_url): images = [] total_pages = get_total_pages(post_url) print(f"帖子 {post_url} 共有 {total_pages} 页") options = Options() options.headless = True # 使用本地固定的ChromeDriver路径,避免每次自动更新 chrome_driver_path = os.path.join( os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "utils", "chromedriver", "chromedriver.exe" ) # 如果本地没有chromedriver.exe,则使用默认方式 if not os.path.exists(chrome_driver_path): driver = webdriver.Chrome(options=options) print("使用默认ChromeDriver") else: from selenium.webdriver.chrome.service import Service driver = webdriver.Chrome(service=Service(chrome_driver_path), options=options) print(f"使用本地ChromeDriver: {chrome_driver_path}") for page in range(1, total_pages + 1): url = f"{post_url}/{page}" if page > 1 else post_url driver.get(url) time.sleep(2) img_elements = driver.find_elements(By.CSS_SELECTOR, 'figure.wp-block-gallery figure.wp-block-image img') for img in img_elements: img_url = img.get_attribute('src') if img_url and img_url.startswith('http'): images.append(img_url) print(f"已爬取 {url},找到 {len(img_elements)} 张图片") driver.quit() return images def download_image(img_url, folder_path, img_index): try: response = requests.get(img_url, headers=headers, timeout=10) response.raise_for_status() img = Image.open(BytesIO(response.content)).convert('RGB') img_name = f"{img_index:03d}.jpg" img_path = os.path.join(folder_path, img_name) img.save(img_path, 'JPEG', quality=95) print(f"已下载并转换为JPG: {img_path}") except Exception as e: print(f"处理图片 {img_url} 失败: {e}") def meitu_dowload_pic(dl_path, dl_url): base_url = dl_url if not os.path.exists(dl_path): os.makedirs(dl_path) print(f"开始爬取 {base_url} 的帖子...") posts = fetch_posts(base_url,dl_path, 10) if not posts: print("未获取到符合条件的帖子,请检查选择器或网络连接。") return print(f"成功选择 {len(posts)} 个未下载的帖子,开始下载图片...") for i, post in enumerate(posts, 1): print(f"\n{i}. 标题: {post['title']}") print(f" 链接: {post['url']}") match = re.search(r'(?:[Nn][Oo]|[Vv][Oo][Ll])\.(\d+)', post['title']) # 支持 "No." 或 " folder_name = match.group(1) if match else post['title'] folder_path = os.path.join(dl_path, folder_name) os.makedirs(folder_path, exist_ok=True) # 创建目录,exist_ok=True 避免重复创建报错 images = fetch_images(post['url']) if images: print(f"共找到 {len(images)} 张图片,开始下载...") for idx, img_url in enumerate(images, 1): download_image(img_url, folder_path, idx) else: print("未找到图片,可能需要调整策略。") time.sleep(1) return download_root def meitu_dowload_pub_pic(): meitu_dowload_pic(download_root, "https://www.mntuce.com/") def meitu_dowload_heisi_pic(): meitu_dowload_pic(download_root_heisi, "https://www.mntuce.com/tag/%e4%b8%9d%e7%a4%be") if __name__ == "__main__": meitu_dowload_heisi_pic()