sehuatang爬取脚本

This commit is contained in:
liuwei
2025-12-17 13:07:59 +08:00
parent cddb3e82ff
commit f1df8eb372

View File

@@ -0,0 +1,422 @@
import time
import os
import re
import mysql.connector
from mysql.connector import Error
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
from webdriver_manager.chrome import ChromeDriverManager
from bs4 import BeautifulSoup
from loguru import logger
from datetime import datetime, timedelta
# ================= 配置区域 =================
# 运行模式: 'full' (全量) 或 'daily' (增量)
RUN_MODE = 'full'
DB_CONFIG = {
'host': '192.168.2.41',
'port': 3306,
'user': 'root', # 【请修改】数据库用户名
'password': 'lw123456', # 【请修改】数据库密码
'database': 'message_archive', # 【请修改】数据库名 (pymysql中是'db', 这里是'database')
'charset': 'utf8mb4',
'use_pure': True # 可选使用纯Python实现避免某些C扩展依赖问题
}
TARGET_FIDS = {
103: '高清中文字幕',
104: '素人有码系列',
37: '亚洲有码原创',
36: '亚洲无码原创',
39: '动漫原创',
160: 'vr',
151: '4k',
2: '国产原创',
38: '欧美无码',
107: '三级写真',
152: '韩国主播'
}
# 排除词库 (用于标题猜测的兜底逻辑)
BLACKLIST_KEYWORDS = {
'高清', '中文', '字幕', '无码', '有码', '步兵', '骑兵', '破解', '流出',
'合集', '系列', '下载', '推荐', '新作', '大片', '偷拍', '自拍', '精选',
'汉化', '日韩', '欧美', '国产', '主播', '成人', '视频', '影片', '最新',
'强力', '严选', '首发', '独家', '今日', '更新', '特辑', '精选', '重磅',
'清晰', '完整', '', '', '', '', '', '', '', ''
}
if RUN_MODE == 'full':
MAX_PAGES_PER_FID = 500
ONLY_CRAWL_TODAY = False
else:
MAX_PAGES_PER_FID = 5
ONLY_CRAWL_TODAY = True
class SehuatangCrawler:
def __init__(self):
self.conn = None
self._connect_db()
self._init_db_table()
self.driver = self._init_driver()
self.today_str = datetime.now().strftime('%Y-%m-%d')
def _connect_db(self):
try:
self.conn = mysql.connector.connect(**DB_CONFIG)
if self.conn.is_connected():
logger.info("数据库连接成功")
except Error as e:
logger.error(f"数据库连接失败: {e}")
raise
def _init_db_table(self):
create_table_sql = """
CREATE TABLE IF NOT EXISTS forum_posts (
id INT AUTO_INCREMENT PRIMARY KEY,
tid VARCHAR(50) NOT NULL UNIQUE COMMENT '帖子ID',
fid INT NOT NULL COMMENT '板块ID',
category_name VARCHAR(50) COMMENT '板块名称',
designation VARCHAR(100) COMMENT '番号',
actress VARCHAR(100) COMMENT '出演女优',
title VARCHAR(255) NOT NULL COMMENT '标题',
magnet_link TEXT COMMENT '磁力链接',
cover_image VARCHAR(500) COMMENT '封面图URL',
post_url VARCHAR(255) NOT NULL COMMENT '帖子链接',
publish_date VARCHAR(20) COMMENT '发布时间',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_designation (designation),
INDEX idx_actress (actress),
INDEX idx_publish_date (publish_date)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
"""
cursor = None
try:
cursor = self.conn.cursor()
cursor.execute(create_table_sql)
self.conn.commit()
except Error as e:
logger.error(f"表结构初始化失败: {e}")
finally:
if cursor: cursor.close()
def _init_driver(self):
options = Options()
# options.add_argument('--headless')
options.add_argument('--disable-gpu')
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
options.add_argument('--blink-settings=imagesEnabled=false')
if os.name == 'nt':
chrome_driver_path = os.path.join(os.getcwd(), "utils", "chromedriver", "chromedriver.exe")
else:
chrome_driver_path = '/usr/bin/chromedriver'
try:
if os.name == 'nt' and not os.path.exists(chrome_driver_path):
chrome_driver_path = ChromeDriverManager().install()
service = Service(chrome_driver_path)
driver = webdriver.Chrome(service=service, options=options)
except Exception:
chrome_driver_path = ChromeDriverManager().install()
driver = webdriver.Chrome(service=Service(chrome_driver_path), options=options)
return driver
def bypass_age_verification(self):
try:
self.driver.get("https://www.sehuatang.net/forum.php")
time.sleep(2)
try:
enter_button = self.driver.find_element(By.XPATH, '//a[contains(text(), "满18岁")]')
enter_button.click()
logger.success("通过年龄验证")
time.sleep(2)
except Exception:
pass
except Exception as e:
logger.warning(f"主页访问异常: {e}")
def parse_relative_date(self, date_str):
now = datetime.now()
date_str = date_str.strip()
try:
if "秒前" in date_str or "刚刚" in date_str:
return now.strftime('%Y-%m-%d')
elif "分钟前" in date_str:
minutes = int(re.search(r'(\d+)', date_str).group(1))
dt = now - timedelta(minutes=minutes)
return dt.strftime('%Y-%m-%d')
elif "小时前" in date_str:
hours = int(re.search(r'(\d+)', date_str).group(1))
dt = now - timedelta(hours=hours)
return dt.strftime('%Y-%m-%d')
elif "昨天" in date_str:
dt = now - timedelta(days=1)
return dt.strftime('%Y-%m-%d')
elif "前天" in date_str:
dt = now - timedelta(days=2)
return dt.strftime('%Y-%m-%d')
elif "天前" in date_str:
days = int(re.search(r'(\d+)', date_str).group(1))
dt = now - timedelta(days=days)
return dt.strftime('%Y-%m-%d')
else:
if re.match(r'\d{4}-\d{1,2}-\d{1,2}', date_str):
return date_str
match = re.match(r'(\d{1,2})-(\d{1,2})', date_str)
if match:
return f"{now.year}-{match.group(1).zfill(2)}-{match.group(2).zfill(2)}"
return now.strftime('%Y-%m-%d')
except Exception:
return now.strftime('%Y-%m-%d')
def extract_designation(self, title):
title = title.upper()
patterns = [
r'FC2-PPV-\d+',
r'\d{6}-\d{3}',
r'[A-Z]{2,5}-\d{3,5}',
r'T28-\d{3}',
r'[A-Z]{2,5}\d{3,5}'
]
for p in patterns:
match = re.search(p, title)
if match:
return match.group(0)
return ""
# ================= 策略B从标题猜测女优 (兜底方案) =================
def extract_actress_from_title(self, title, designation):
"""当详情页没写女优时,尝试从标题分析"""
if designation:
title = re.sub(re.escape(designation), '', title, flags=re.IGNORECASE)
title = re.sub(r'(?i)\[.*?\]|【.*?】|\(.*?\)|mp4|1080p|720p|4k', ' ', title)
clean_text = re.sub(r'[^\u4e00-\u9fa5\s]', ' ', title)
tokens = clean_text.split()
potential_names = []
for token in tokens:
token = token.strip()
if 2 <= len(token) <= 4:
if token not in BLACKLIST_KEYWORDS:
potential_names.append(token)
if potential_names:
return potential_names[0]
return ""
def check_exists(self, tid, designation):
cursor = None
try:
cursor = self.conn.cursor()
sql_tid = "SELECT id FROM forum_posts WHERE tid = %s LIMIT 1"
cursor.execute(sql_tid, (tid,))
if cursor.fetchone():
return True
if designation:
sql_des = "SELECT id FROM forum_posts WHERE designation = %s LIMIT 1"
cursor.execute(sql_des, (designation,))
if cursor.fetchone():
return True
return False
except Error:
return False
finally:
if cursor: cursor.close()
def clean_magnet(self, magnet_text):
if not magnet_text: return ""
magnet_text = magnet_text.replace('复制代码', '').strip()
match = re.search(r'(magnet:\?xt=urn:btih:[a-zA-Z0-9]+[^\s"\'<>\u4e00-\u9fa5]*)', magnet_text)
if match: return match.group(1)
return magnet_text
def save_to_db(self, data):
cursor = None
try:
cursor = self.conn.cursor(dictionary=True)
sql = """
INSERT INTO forum_posts
(tid, fid, category_name, designation, actress, title, magnet_link, cover_image, post_url, publish_date)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
title = VALUES(title),
actress = VALUES(actress),
magnet_link = VALUES(magnet_link),
cover_image = VALUES(cover_image),
updated_at = NOW()
"""
cursor.execute(sql, (
data['tid'], data['fid'], data['category_name'],
data['designation'], data['actress'], data['title'],
data['magnet_link'], data['cover_image'],
data['post_url'], data['publish_date']
))
self.conn.commit()
actress_log = f"[{data['actress']}] " if data['actress'] else ""
logger.info(f"保存: {actress_log}{data['title'][:15]}...")
except Error as e:
logger.error(f"DB错误: {e}")
if self.conn.is_connected(): self.conn.rollback()
finally:
if cursor: cursor.close()
# ================= 核心修改:详情页解析 =================
def parse_detail_page(self, post_url):
magnet_link = ""
cover_image = ""
actress_in_body = "" # 详情页提取到的女优
try:
self.driver.get(post_url)
time.sleep(1 if RUN_MODE == 'full' else 2)
soup = BeautifulSoup(self.driver.page_source, 'html.parser')
content_div = soup.find('div', {'class': 't_fsz'})
if content_div:
# 1. 提取磁力链
magnet_tags = content_div.find_all('a', href=re.compile(r'^magnet:\?'))
for tag in magnet_tags:
href = tag.get('href', '')
if 'xt=urn:btih:' in href:
magnet_link = href
break
if not magnet_link:
text = content_div.get_text()
match = re.search(r'magnet:\?xt=urn:btih:[a-zA-Z0-9]+.*', text)
if match: magnet_link = match.group(0)
magnet_link = self.clean_magnet(magnet_link)
# 2. 提取图片
imgs = content_div.find_all('img')
for img in imgs:
zoomfile = img.get('zoomfile')
if zoomfile and zoomfile.startswith('http'):
cover_image = zoomfile
break
file_attr = img.get('file')
if file_attr and file_attr.startswith('http'):
cover_image = file_attr
break
# 3. [新] 提取【出演女优】
# 使用 separator='\n' 保持换行,防止文字粘连
text_content = content_div.get_text(separator='\n')
# 正则匹配:支持 【】 或 [],支持冒号或空格
# 匹配逻辑:找 "女优" 关键词,后面跟冒号,再取剩下的一整行文字
actress_match = re.search(r'(?:【|\[)(?:出演)?女优(?:】|\])\s*[:]\s*(.*)', text_content)
if actress_match:
raw_actress = actress_match.group(1).strip()
# 再次清洗一下防止后面有HTML标签残留
actress_in_body = raw_actress.split('<')[0].strip()
except Exception:
pass
# 返回三个值
return magnet_link, cover_image, actress_in_body
def crawl_forum(self, fid, category_name):
logger.info(f"=== 开始板块: {category_name} (FID: {fid}) ===")
consecutive_old_posts = 0
for page in range(1, MAX_PAGES_PER_FID + 1):
if RUN_MODE == 'daily' and consecutive_old_posts > 20:
break
list_url = f"https://www.sehuatang.net/forum-{fid}-{page}.html"
logger.info(f"正在爬取第 {page}")
try:
self.driver.get(list_url)
time.sleep(0.5)
soup = BeautifulSoup(self.driver.page_source, 'html.parser')
threads = soup.find_all('tbody', {'id': lambda x: x and x.startswith('normalthread')})
if not threads: break
for thread in threads:
try:
tid_str = thread.get('id', '').split('_')[-1]
title_tag = thread.find('a', {'class': 's xst'})
if not title_tag: continue
title = title_tag.get_text()
date_span = thread.find('td', {'class': 'by'}).find('em').find('span')
raw_date = date_span.get_text() if date_span else ""
publish_date = self.parse_relative_date(raw_date)
if RUN_MODE == 'daily' and ONLY_CRAWL_TODAY:
if publish_date != self.today_str:
consecutive_old_posts += 1
if "昨天" in raw_date or publish_date < self.today_str:
continue
continue
else:
consecutive_old_posts = 0
designation = self.extract_designation(title)
if self.check_exists(tid_str, designation):
continue
partial_url = title_tag.get('href')
full_url = f"https://www.sehuatang.net/{partial_url}"
# 获取详情页数据(含女优)
magnet, cover, body_actress = self.parse_detail_page(full_url)
# ================= 决策逻辑 =================
# 优先用详情页里抓到的 body_actress
# 如果没抓到,再尝试用标题分析
final_actress = body_actress if body_actress else self.extract_actress_from_title(title,
designation)
if magnet:
post_data = {
'tid': tid_str,
'fid': fid,
'category_name': category_name,
'designation': designation,
'actress': final_actress, # 最终决定的女优名
'title': title,
'magnet_link': magnet,
'cover_image': cover,
'post_url': full_url,
'publish_date': publish_date
}
self.save_to_db(post_data)
except Exception as e:
logger.error(f"单贴异常: {e}")
continue
except Exception as e:
logger.error(f"列表页异常: {e}")
def close(self):
if self.conn and self.conn.is_connected():
self.conn.close()
if self.driver:
self.driver.quit()
def run(self):
try:
self.bypass_age_verification()
for fid, name in TARGET_FIDS.items():
self.crawl_forum(fid, name)
finally:
self.close()
if __name__ == "__main__":
crawler = SehuatangCrawler()
crawler.run()