import time import os import re import requests import mysql.connector from mysql.connector import Error from selenium.webdriver.common.by import By import undetected_chromedriver as uc if os.name == 'nt': try: uc.Chrome.__del__ = lambda self: None except Exception: pass from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from bs4 import BeautifulSoup from loguru import logger from datetime import datetime, timedelta # ================= 配置区域 ================= # 运行模式: 'full' (全量) 或 'daily' (增量) RUN_MODE = 'daily' DB_CONFIG = { 'host': '192.168.2.41', 'port': 3306, 'user': 'root', # 【请修改】数据库用户名 'password': 'lw123456', # 【请修改】数据库密码 'database': 'message_archive', # 【请修改】数据库名 (pymysql中是'db', 这里是'database') 'charset': 'utf8mb4', 'use_pure': True # 可选:使用纯Python实现,避免某些C扩展依赖问题 } TARGET_FIDS = { 103: '高清中文字幕', 104: '素人有码系列', 37: '亚洲有码原创', 36: '亚洲无码原创', 39: '动漫原创', 160: 'vr', 151: '4k', 2: '国产原创', 38: '欧美无码', 107: '三级写真', 152: '韩国主播' } # 排除词库 (用于标题猜测的兜底逻辑) BLACKLIST_KEYWORDS = { '高清', '中文', '字幕', '无码', '有码', '步兵', '骑兵', '破解', '流出', '合集', '系列', '下载', '推荐', '新作', '大片', '偷拍', '自拍', '精选', '汉化', '日韩', '欧美', '国产', '主播', '成人', '视频', '影片', '最新', '强力', '严选', '首发', '独家', '今日', '更新', '特辑', '精选', '重磅', '清晰', '完整', '版', '部', '集', '天', '月', '年', '号', '期' } if RUN_MODE == 'full': MAX_PAGES_PER_FID = 500 ONLY_CRAWL_TODAY = False else: MAX_PAGES_PER_FID = 5 ONLY_CRAWL_TODAY = True class SehuatangCrawler: def __init__(self): self.conn = None self._connect_db() self._init_db_table() self.driver = self._init_driver() self.session = None self.today_str = datetime.now().strftime('%Y-%m-%d') def _connect_db(self): try: self.conn = mysql.connector.connect(**DB_CONFIG) if self.conn.is_connected(): logger.info("数据库连接成功") except Error as e: logger.error(f"数据库连接失败: {e}") raise def _init_db_table(self): create_table_sql = """ CREATE TABLE IF NOT EXISTS forum_posts ( id INT AUTO_INCREMENT PRIMARY KEY, tid VARCHAR(50) NOT NULL UNIQUE COMMENT '帖子ID', fid INT NOT NULL COMMENT '板块ID', category_name VARCHAR(50) COMMENT '板块名称', designation VARCHAR(100) COMMENT '番号', actress VARCHAR(100) COMMENT '出演女优', title VARCHAR(255) NOT NULL COMMENT '标题', magnet_link TEXT COMMENT '磁力链接', cover_image VARCHAR(500) COMMENT '封面图URL', post_url VARCHAR(255) NOT NULL COMMENT '帖子链接', publish_date VARCHAR(20) COMMENT '发布时间', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, INDEX idx_designation (designation), INDEX idx_actress (actress), INDEX idx_publish_date (publish_date) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; """ cursor = None try: cursor = self.conn.cursor() cursor.execute(create_table_sql) self.conn.commit() except Error as e: logger.error(f"表结构初始化失败: {e}") finally: if cursor: cursor.close() def _init_driver(self): options = uc.ChromeOptions() # 规避检测的关键配置 options.headless = False options.add_argument('--no-sandbox') options.add_argument('--disable-gpu') options.add_argument('--disable-dev-shm-usage') # 如果依然在 Headless 触发检测,建议第一次运行设为 False 手动通过 driver = uc.Chrome(options=options) return driver def bypass_age_verification(self): try: self.driver.get("https://www.sehuatang.net/") try: btn = WebDriverWait(self.driver, 6).until(EC.element_to_be_clickable((By.XPATH, '//a[contains(text(), "满18岁,请点此进入")]'))) btn.click() logger.success("通过年龄验证") except Exception: try: btn2 = WebDriverWait(self.driver, 4).until(EC.element_to_be_clickable((By.XPATH, '//a[contains(text(), "满18岁")]'))) btn2.click() logger.success("通过年龄验证") except Exception: pass ua = self.driver.execute_script("return navigator.userAgent") self.session = requests.Session() self.session.headers.update({'User-Agent': ua, 'Referer': 'https://www.sehuatang.net/'}) for c in self.driver.get_cookies(): try: self.session.cookies.set(c['name'], c['value'], domain=c.get('domain'), path=c.get('path', '/')) except Exception: self.session.cookies.set(c['name'], c['value']) except Exception as e: logger.warning(f"主页访问异常: {e}") def parse_relative_date(self, date_str): now = datetime.now() date_str = date_str.strip() try: if "秒前" in date_str or "刚刚" in date_str: return now.strftime('%Y-%m-%d') elif "分钟前" in date_str: minutes = int(re.search(r'(\d+)', date_str).group(1)) dt = now - timedelta(minutes=minutes) return dt.strftime('%Y-%m-%d') elif "小时前" in date_str: hours = int(re.search(r'(\d+)', date_str).group(1)) dt = now - timedelta(hours=hours) return dt.strftime('%Y-%m-%d') elif "昨天" in date_str: dt = now - timedelta(days=1) return dt.strftime('%Y-%m-%d') elif "前天" in date_str: dt = now - timedelta(days=2) return dt.strftime('%Y-%m-%d') elif "天前" in date_str: days = int(re.search(r'(\d+)', date_str).group(1)) dt = now - timedelta(days=days) return dt.strftime('%Y-%m-%d') else: if re.match(r'\d{4}-\d{1,2}-\d{1,2}', date_str): return date_str match = re.match(r'(\d{1,2})-(\d{1,2})', date_str) if match: return f"{now.year}-{match.group(1).zfill(2)}-{match.group(2).zfill(2)}" return now.strftime('%Y-%m-%d') except Exception: return now.strftime('%Y-%m-%d') def extract_designation(self, title): title = title.upper() patterns = [ r'FC2-PPV-\d+', r'\d{6}-\d{3}', r'[A-Z]{2,5}-\d{3,5}', r'T28-\d{3}', r'[A-Z]{2,5}\d{3,5}' ] for p in patterns: match = re.search(p, title) if match: return match.group(0) return "" # ================= 策略B:从标题猜测女优 (兜底方案) ================= def extract_actress_from_title(self, title, designation): """当详情页没写女优时,尝试从标题分析""" if designation: title = re.sub(re.escape(designation), '', title, flags=re.IGNORECASE) title = re.sub(r'(?i)\[.*?\]|【.*?】|\(.*?\)|mp4|1080p|720p|4k', ' ', title) clean_text = re.sub(r'[^\u4e00-\u9fa5\s]', ' ', title) tokens = clean_text.split() potential_names = [] for token in tokens: token = token.strip() if 2 <= len(token) <= 4: if token not in BLACKLIST_KEYWORDS: potential_names.append(token) if potential_names: return potential_names[0] return "" def check_exists(self, tid, designation): cursor = None try: cursor = self.conn.cursor() sql_tid = "SELECT id FROM forum_posts WHERE tid = %s LIMIT 1" cursor.execute(sql_tid, (tid,)) if cursor.fetchone(): return True if designation: sql_des = "SELECT id FROM forum_posts WHERE designation = %s LIMIT 1" cursor.execute(sql_des, (designation,)) if cursor.fetchone(): return True return False except Error: return False finally: if cursor: cursor.close() def clean_magnet(self, magnet_text): if not magnet_text: return "" magnet_text = magnet_text.replace('复制代码', '').strip() match = re.search(r'(magnet:\?xt=urn:btih:[a-zA-Z0-9]+[^\s"\'<>\u4e00-\u9fa5]*)', magnet_text) if match: return match.group(1) return magnet_text def save_to_db(self, data): cursor = None try: cursor = self.conn.cursor(dictionary=True) sql = """ INSERT INTO forum_posts (tid, fid, category_name, designation, actress, title, magnet_link, cover_image, post_url, publish_date) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON DUPLICATE KEY UPDATE title = VALUES(title), actress = VALUES(actress), magnet_link = VALUES(magnet_link), cover_image = VALUES(cover_image), updated_at = NOW() """ cursor.execute(sql, ( data['tid'], data['fid'], data['category_name'], data['designation'], data['actress'], data['title'], data['magnet_link'], data['cover_image'], data['post_url'], data['publish_date'] )) self.conn.commit() actress_log = f"[{data['actress']}] " if data['actress'] else "" logger.info(f"保存: {actress_log}{data['title'][:15]}...") except Error as e: logger.error(f"DB错误: {e}") if self.conn.is_connected(): self.conn.rollback() finally: if cursor: cursor.close() # ================= 核心修改:详情页解析 ================= def parse_detail_page(self, post_url): magnet_link = "" cover_image = "" actress_in_body = "" try: # 修复403问题: 使用Selenium访问而不是requests self.driver.get(post_url) # 等待内容加载 WebDriverWait(self.driver, 10).until(EC.presence_of_element_located((By.TAG_NAME, 'body'))) soup = BeautifulSoup(self.driver.page_source, 'html.parser') # 调试: 检查页面标题(验证是否成功加载) page_title = soup.find('title') logger.debug(f"详情页标题: {page_title.get_text() if page_title else 'None'}, URL: {post_url}") # 修复1: 使用正确的选择器 # 方法1: 直接找 td class="t_f" content_td = soup.find('td', {'class': 't_f'}) # 方法2: 如果上面不行,尝试找 id="postmessage_*" if not content_td: content_td = soup.find('td', {'id': lambda x: x and x.startswith('postmessage_')}) # 调试: 检查是否找到容器 if content_td: logger.debug(f"✓ 找到内容容器: id={content_td.get('id', '')}") else: logger.warning(f"✗ 未找到内容容器 (td class='t_f' 或 id='postmessage_*')") # 尝试打印所有td标签看看有什么 all_tds = soup.find_all('td') logger.debug(f"页面共有 {len(all_tds)} 个td标签") return magnet_link, cover_image, actress_in_body # 修复2: 磁力链接在
里的纯文本中 magnet_div = content_td.find('div', {'class': 'blockcode'}) if magnet_div: # 直接提取文本,去掉空白符 magnet_text = magnet_div.get_text(strip=True) logger.debug(f"找到blockcode, 内容: {magnet_text[:100]}") # 用正则匹配磁力链接 match = re.search(r'(magnet:\?xt=urn:btih:[a-zA-Z0-9]+[^\s"\'<>\u4e00-\u9fa5]*)', magnet_text) if match: magnet_link = match.group(1) logger.debug(f"✓ 提取到磁力链接: {magnet_link[:50]}...") else: logger.debug(f"✗ blockcode中未匹配到磁力链接") else: logger.debug(f"✗ 未找到div class='blockcode'") # 兜底:如果上面没找到,再尝试在整个文本中搜索 if not magnet_link: full_text = content_td.get_text() match = re.search(r'magnet:\?xt=urn:btih:[a-zA-Z0-9]+.*', full_text) if match: magnet_link = self.clean_magnet(match.group(0)) logger.debug(f"✓ 兜底提取到磁力链接: {magnet_link[:50]}...") else: logger.debug(f"✗ 整个文本中也未找到磁力链接") # 封面图:逻辑正确,但要确保在正确的容器里查找 imgs = content_td.find_all('img') logger.debug(f"内容容器中共有 {len(imgs)} 个img标签") for img in imgs: zoomfile = img.get('zoomfile') file_attr = img.get('file') if zoomfile and zoomfile.startswith('http') and 'static/image/common/none.gif' not in zoomfile: cover_image = zoomfile logger.debug(f"✓ 使用zoomfile作为封面: {cover_image[:50]}...") break if file_attr and file_attr.startswith('http') and 'static/image/common/none.gif' not in file_attr: cover_image = file_attr logger.debug(f"✓ 使用file作为封面: {cover_image[:50]}...") break if not cover_image: logger.debug(f"✗ 未找到合适的封面图") # 女优:从文本中提取 text_content = content_td.get_text(separator='\n') actress_match = re.search(r'(?:【|\[)(?:出演)?女优(?:】|\])\s*[::]\s*([^\n<]+)', text_content) if actress_match: raw_actress = actress_match.group(1).strip() actress_in_body = raw_actress.split('<')[0].split()[0].strip() logger.debug(f"✓ 提取到女优: {actress_in_body}") else: logger.debug(f"✗ 未匹配到女优信息") except Exception as e: logger.error(f"详情页解析异常: {e}, URL: {post_url}") return magnet_link, cover_image, actress_in_body def crawl_forum(self, fid, category_name): logger.info(f"=== 开始板块: {category_name} (FID: {fid}) ===") consecutive_old_posts = 0 for page in range(1, MAX_PAGES_PER_FID + 1): if RUN_MODE == 'daily' and consecutive_old_posts > 20: break list_url = f"https://www.sehuatang.net/forum-{fid}-{page}.html" logger.info(f"正在爬取第 {page} 页") try: self.driver.get(list_url) WebDriverWait(self.driver, 10).until(EC.presence_of_all_elements_located((By.CSS_SELECTOR, 'tbody[id^="normalthread"]'))) soup = BeautifulSoup(self.driver.page_source, 'html.parser') threads = soup.find_all('tbody', {'id': lambda x: x and x.startswith('normalthread')}) if not threads: break for thread in threads: try: tid_str = thread.get('id', '').split('_')[-1] title_tag = thread.find('a', {'class': 's xst'}) if not title_tag: continue title = title_tag.get_text() date_span = thread.find('td', {'class': 'by'}).find('em').find('span') raw_date = date_span.get_text() if date_span else "" publish_date = self.parse_relative_date(raw_date) logger.debug("publish_date: "+ publish_date) if RUN_MODE == 'daily' and ONLY_CRAWL_TODAY: if publish_date != self.today_str: consecutive_old_posts += 1 if "昨天" in raw_date or publish_date < self.today_str: continue continue else: consecutive_old_posts = 0 designation = self.extract_designation(title) if self.check_exists(tid_str, designation): continue partial_url = title_tag.get('href') full_url = f"https://www.sehuatang.net/{partial_url}" logger.info("get url : " + full_url) # 获取详情页数据(含女优) magnet, cover, body_actress = self.parse_detail_page(full_url) logger.debug(f"magnet: {magnet}, cover: {cover}, body_actress: {body_actress}") # ================= 决策逻辑 ================= # 优先用详情页里抓到的 body_actress # 如果没抓到,再尝试用标题分析 final_actress = body_actress if body_actress else self.extract_actress_from_title(title, designation) if magnet: post_data = { 'tid': tid_str, 'fid': fid, 'category_name': category_name, 'designation': designation, 'actress': final_actress, # 最终决定的女优名 'title': title, 'magnet_link': magnet, 'cover_image': cover, 'post_url': full_url, 'publish_date': publish_date } self.save_to_db(post_data) except Exception as e: logger.error(f"单贴异常: {e}") continue except Exception as e: logger.error(f"列表页异常: {e}") def close(self): if self.conn and self.conn.is_connected(): self.conn.close() if self.driver: try: self.driver.close() except Exception: pass try: self.driver.quit() except Exception: pass def run(self): try: self.bypass_age_verification() for fid, name in TARGET_FIDS.items(): self.crawl_forum(fid, name) finally: self.close() if __name__ == "__main__": crawler = SehuatangCrawler() crawler.run()