diff --git a/utils/sehuatang/sehuatang_bot.py b/utils/sehuatang/sehuatang_bot.py new file mode 100644 index 0000000..d099d53 --- /dev/null +++ b/utils/sehuatang/sehuatang_bot.py @@ -0,0 +1,422 @@ +import time +import os +import re +import mysql.connector +from mysql.connector import Error +from selenium import webdriver +from selenium.webdriver.chrome.service import Service +from selenium.webdriver.common.by import By +from selenium.webdriver.chrome.options import Options +from webdriver_manager.chrome import ChromeDriverManager +from bs4 import BeautifulSoup +from loguru import logger +from datetime import datetime, timedelta + +# ================= 配置区域 ================= + +# 运行模式: 'full' (全量) 或 'daily' (增量) +RUN_MODE = 'full' + +DB_CONFIG = { + 'host': '192.168.2.41', + 'port': 3306, + 'user': 'root', # 【请修改】数据库用户名 + 'password': 'lw123456', # 【请修改】数据库密码 + 'database': 'message_archive', # 【请修改】数据库名 (pymysql中是'db', 这里是'database') + 'charset': 'utf8mb4', + 'use_pure': True # 可选:使用纯Python实现,避免某些C扩展依赖问题 +} + + +TARGET_FIDS = { + 103: '高清中文字幕', + 104: '素人有码系列', + 37: '亚洲有码原创', + 36: '亚洲无码原创', + 39: '动漫原创', + 160: 'vr', + 151: '4k', + 2: '国产原创', + 38: '欧美无码', + 107: '三级写真', + 152: '韩国主播' +} + +# 排除词库 (用于标题猜测的兜底逻辑) +BLACKLIST_KEYWORDS = { + '高清', '中文', '字幕', '无码', '有码', '步兵', '骑兵', '破解', '流出', + '合集', '系列', '下载', '推荐', '新作', '大片', '偷拍', '自拍', '精选', + '汉化', '日韩', '欧美', '国产', '主播', '成人', '视频', '影片', '最新', + '强力', '严选', '首发', '独家', '今日', '更新', '特辑', '精选', '重磅', + '清晰', '完整', '版', '部', '集', '天', '月', '年', '号', '期' +} + +if RUN_MODE == 'full': + MAX_PAGES_PER_FID = 500 + ONLY_CRAWL_TODAY = False +else: + MAX_PAGES_PER_FID = 5 + ONLY_CRAWL_TODAY = True + + +class SehuatangCrawler: + def __init__(self): + self.conn = None + self._connect_db() + self._init_db_table() + self.driver = self._init_driver() + self.today_str = datetime.now().strftime('%Y-%m-%d') + + def _connect_db(self): + try: + self.conn = mysql.connector.connect(**DB_CONFIG) + if self.conn.is_connected(): + logger.info("数据库连接成功") + except Error as e: + logger.error(f"数据库连接失败: {e}") + raise + + def _init_db_table(self): + create_table_sql = """ + CREATE TABLE IF NOT EXISTS forum_posts ( + id INT AUTO_INCREMENT PRIMARY KEY, + tid VARCHAR(50) NOT NULL UNIQUE COMMENT '帖子ID', + fid INT NOT NULL COMMENT '板块ID', + category_name VARCHAR(50) COMMENT '板块名称', + designation VARCHAR(100) COMMENT '番号', + actress VARCHAR(100) COMMENT '出演女优', + title VARCHAR(255) NOT NULL COMMENT '标题', + magnet_link TEXT COMMENT '磁力链接', + cover_image VARCHAR(500) COMMENT '封面图URL', + post_url VARCHAR(255) NOT NULL COMMENT '帖子链接', + publish_date VARCHAR(20) COMMENT '发布时间', + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + INDEX idx_designation (designation), + INDEX idx_actress (actress), + INDEX idx_publish_date (publish_date) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; + """ + cursor = None + try: + cursor = self.conn.cursor() + cursor.execute(create_table_sql) + self.conn.commit() + except Error as e: + logger.error(f"表结构初始化失败: {e}") + finally: + if cursor: cursor.close() + + def _init_driver(self): + options = Options() + # options.add_argument('--headless') + options.add_argument('--disable-gpu') + options.add_argument('--no-sandbox') + options.add_argument('--disable-dev-shm-usage') + options.add_argument('--blink-settings=imagesEnabled=false') + + if os.name == 'nt': + chrome_driver_path = os.path.join(os.getcwd(), "utils", "chromedriver", "chromedriver.exe") + else: + chrome_driver_path = '/usr/bin/chromedriver' + + try: + if os.name == 'nt' and not os.path.exists(chrome_driver_path): + chrome_driver_path = ChromeDriverManager().install() + service = Service(chrome_driver_path) + driver = webdriver.Chrome(service=service, options=options) + except Exception: + chrome_driver_path = ChromeDriverManager().install() + driver = webdriver.Chrome(service=Service(chrome_driver_path), options=options) + return driver + + def bypass_age_verification(self): + try: + self.driver.get("https://www.sehuatang.net/forum.php") + time.sleep(2) + try: + enter_button = self.driver.find_element(By.XPATH, '//a[contains(text(), "满18岁")]') + enter_button.click() + logger.success("通过年龄验证") + time.sleep(2) + except Exception: + pass + except Exception as e: + logger.warning(f"主页访问异常: {e}") + + def parse_relative_date(self, date_str): + now = datetime.now() + date_str = date_str.strip() + try: + if "秒前" in date_str or "刚刚" in date_str: + return now.strftime('%Y-%m-%d') + elif "分钟前" in date_str: + minutes = int(re.search(r'(\d+)', date_str).group(1)) + dt = now - timedelta(minutes=minutes) + return dt.strftime('%Y-%m-%d') + elif "小时前" in date_str: + hours = int(re.search(r'(\d+)', date_str).group(1)) + dt = now - timedelta(hours=hours) + return dt.strftime('%Y-%m-%d') + elif "昨天" in date_str: + dt = now - timedelta(days=1) + return dt.strftime('%Y-%m-%d') + elif "前天" in date_str: + dt = now - timedelta(days=2) + return dt.strftime('%Y-%m-%d') + elif "天前" in date_str: + days = int(re.search(r'(\d+)', date_str).group(1)) + dt = now - timedelta(days=days) + return dt.strftime('%Y-%m-%d') + else: + if re.match(r'\d{4}-\d{1,2}-\d{1,2}', date_str): + return date_str + match = re.match(r'(\d{1,2})-(\d{1,2})', date_str) + if match: + return f"{now.year}-{match.group(1).zfill(2)}-{match.group(2).zfill(2)}" + return now.strftime('%Y-%m-%d') + except Exception: + return now.strftime('%Y-%m-%d') + + def extract_designation(self, title): + title = title.upper() + patterns = [ + r'FC2-PPV-\d+', + r'\d{6}-\d{3}', + r'[A-Z]{2,5}-\d{3,5}', + r'T28-\d{3}', + r'[A-Z]{2,5}\d{3,5}' + ] + for p in patterns: + match = re.search(p, title) + if match: + return match.group(0) + return "" + + # ================= 策略B:从标题猜测女优 (兜底方案) ================= + def extract_actress_from_title(self, title, designation): + """当详情页没写女优时,尝试从标题分析""" + if designation: + title = re.sub(re.escape(designation), '', title, flags=re.IGNORECASE) + title = re.sub(r'(?i)\[.*?\]|【.*?】|\(.*?\)|mp4|1080p|720p|4k', ' ', title) + clean_text = re.sub(r'[^\u4e00-\u9fa5\s]', ' ', title) + tokens = clean_text.split() + potential_names = [] + for token in tokens: + token = token.strip() + if 2 <= len(token) <= 4: + if token not in BLACKLIST_KEYWORDS: + potential_names.append(token) + if potential_names: + return potential_names[0] + return "" + + def check_exists(self, tid, designation): + cursor = None + try: + cursor = self.conn.cursor() + sql_tid = "SELECT id FROM forum_posts WHERE tid = %s LIMIT 1" + cursor.execute(sql_tid, (tid,)) + if cursor.fetchone(): + return True + if designation: + sql_des = "SELECT id FROM forum_posts WHERE designation = %s LIMIT 1" + cursor.execute(sql_des, (designation,)) + if cursor.fetchone(): + return True + return False + except Error: + return False + finally: + if cursor: cursor.close() + + def clean_magnet(self, magnet_text): + if not magnet_text: return "" + magnet_text = magnet_text.replace('复制代码', '').strip() + match = re.search(r'(magnet:\?xt=urn:btih:[a-zA-Z0-9]+[^\s"\'<>\u4e00-\u9fa5]*)', magnet_text) + if match: return match.group(1) + return magnet_text + + def save_to_db(self, data): + cursor = None + try: + cursor = self.conn.cursor(dictionary=True) + sql = """ + INSERT INTO forum_posts + (tid, fid, category_name, designation, actress, title, magnet_link, cover_image, post_url, publish_date) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) + ON DUPLICATE KEY UPDATE + title = VALUES(title), + actress = VALUES(actress), + magnet_link = VALUES(magnet_link), + cover_image = VALUES(cover_image), + updated_at = NOW() + """ + cursor.execute(sql, ( + data['tid'], data['fid'], data['category_name'], + data['designation'], data['actress'], data['title'], + data['magnet_link'], data['cover_image'], + data['post_url'], data['publish_date'] + )) + self.conn.commit() + actress_log = f"[{data['actress']}] " if data['actress'] else "" + logger.info(f"保存: {actress_log}{data['title'][:15]}...") + except Error as e: + logger.error(f"DB错误: {e}") + if self.conn.is_connected(): self.conn.rollback() + finally: + if cursor: cursor.close() + + # ================= 核心修改:详情页解析 ================= + def parse_detail_page(self, post_url): + magnet_link = "" + cover_image = "" + actress_in_body = "" # 详情页提取到的女优 + + try: + self.driver.get(post_url) + time.sleep(1 if RUN_MODE == 'full' else 2) + + soup = BeautifulSoup(self.driver.page_source, 'html.parser') + content_div = soup.find('div', {'class': 't_fsz'}) + + if content_div: + # 1. 提取磁力链 + magnet_tags = content_div.find_all('a', href=re.compile(r'^magnet:\?')) + for tag in magnet_tags: + href = tag.get('href', '') + if 'xt=urn:btih:' in href: + magnet_link = href + break + if not magnet_link: + text = content_div.get_text() + match = re.search(r'magnet:\?xt=urn:btih:[a-zA-Z0-9]+.*', text) + if match: magnet_link = match.group(0) + magnet_link = self.clean_magnet(magnet_link) + + # 2. 提取图片 + imgs = content_div.find_all('img') + for img in imgs: + zoomfile = img.get('zoomfile') + if zoomfile and zoomfile.startswith('http'): + cover_image = zoomfile + break + file_attr = img.get('file') + if file_attr and file_attr.startswith('http'): + cover_image = file_attr + break + + # 3. [新] 提取【出演女优】 + # 使用 separator='\n' 保持换行,防止文字粘连 + text_content = content_div.get_text(separator='\n') + + # 正则匹配:支持 【】 或 [],支持冒号或空格 + # 匹配逻辑:找 "女优" 关键词,后面跟冒号,再取剩下的一整行文字 + actress_match = re.search(r'(?:【|\[)(?:出演)?女优(?:】|\])\s*[::]\s*(.*)', text_content) + if actress_match: + raw_actress = actress_match.group(1).strip() + # 再次清洗一下,防止后面有HTML标签残留 + actress_in_body = raw_actress.split('<')[0].strip() + + except Exception: + pass + + # 返回三个值 + return magnet_link, cover_image, actress_in_body + + def crawl_forum(self, fid, category_name): + logger.info(f"=== 开始板块: {category_name} (FID: {fid}) ===") + consecutive_old_posts = 0 + + for page in range(1, MAX_PAGES_PER_FID + 1): + if RUN_MODE == 'daily' and consecutive_old_posts > 20: + break + + list_url = f"https://www.sehuatang.net/forum-{fid}-{page}.html" + logger.info(f"正在爬取第 {page} 页") + + try: + self.driver.get(list_url) + time.sleep(0.5) + + soup = BeautifulSoup(self.driver.page_source, 'html.parser') + threads = soup.find_all('tbody', {'id': lambda x: x and x.startswith('normalthread')}) + if not threads: break + + for thread in threads: + try: + tid_str = thread.get('id', '').split('_')[-1] + title_tag = thread.find('a', {'class': 's xst'}) + if not title_tag: continue + title = title_tag.get_text() + + date_span = thread.find('td', {'class': 'by'}).find('em').find('span') + raw_date = date_span.get_text() if date_span else "" + publish_date = self.parse_relative_date(raw_date) + + if RUN_MODE == 'daily' and ONLY_CRAWL_TODAY: + if publish_date != self.today_str: + consecutive_old_posts += 1 + if "昨天" in raw_date or publish_date < self.today_str: + continue + continue + else: + consecutive_old_posts = 0 + + designation = self.extract_designation(title) + + if self.check_exists(tid_str, designation): + continue + + partial_url = title_tag.get('href') + full_url = f"https://www.sehuatang.net/{partial_url}" + + # 获取详情页数据(含女优) + magnet, cover, body_actress = self.parse_detail_page(full_url) + + # ================= 决策逻辑 ================= + # 优先用详情页里抓到的 body_actress + # 如果没抓到,再尝试用标题分析 + final_actress = body_actress if body_actress else self.extract_actress_from_title(title, + designation) + + if magnet: + post_data = { + 'tid': tid_str, + 'fid': fid, + 'category_name': category_name, + 'designation': designation, + 'actress': final_actress, # 最终决定的女优名 + 'title': title, + 'magnet_link': magnet, + 'cover_image': cover, + 'post_url': full_url, + 'publish_date': publish_date + } + self.save_to_db(post_data) + + except Exception as e: + logger.error(f"单贴异常: {e}") + continue + + except Exception as e: + logger.error(f"列表页异常: {e}") + + def close(self): + if self.conn and self.conn.is_connected(): + self.conn.close() + if self.driver: + self.driver.quit() + + def run(self): + try: + self.bypass_age_verification() + for fid, name in TARGET_FIDS.items(): + self.crawl_forum(fid, name) + finally: + self.close() + + +if __name__ == "__main__": + crawler = SehuatangCrawler() + crawler.run() \ No newline at end of file