473 lines
20 KiB
Python
473 lines
20 KiB
Python
import time
|
||
import os
|
||
import re
|
||
import requests
|
||
import mysql.connector
|
||
from mysql.connector import Error
|
||
from selenium.webdriver.common.by import By
|
||
import undetected_chromedriver as uc
|
||
if os.name == 'nt':
|
||
try:
|
||
uc.Chrome.__del__ = lambda self: None
|
||
except Exception:
|
||
pass
|
||
from selenium.webdriver.support.ui import WebDriverWait
|
||
from selenium.webdriver.support import expected_conditions as EC
|
||
from bs4 import BeautifulSoup
|
||
from loguru import logger
|
||
from datetime import datetime, timedelta
|
||
|
||
# ================= 配置区域 =================
|
||
|
||
# 运行模式: 'full' (全量) 或 'daily' (增量)
|
||
RUN_MODE = 'daily'
|
||
|
||
DB_CONFIG = {
|
||
'host': '192.168.2.41',
|
||
'port': 3306,
|
||
'user': 'root', # 【请修改】数据库用户名
|
||
'password': 'lw123456', # 【请修改】数据库密码
|
||
'database': 'message_archive', # 【请修改】数据库名 (pymysql中是'db', 这里是'database')
|
||
'charset': 'utf8mb4',
|
||
'use_pure': True # 可选:使用纯Python实现,避免某些C扩展依赖问题
|
||
}
|
||
|
||
|
||
TARGET_FIDS = {
|
||
103: '高清中文字幕',
|
||
104: '素人有码系列',
|
||
37: '亚洲有码原创',
|
||
36: '亚洲无码原创',
|
||
39: '动漫原创',
|
||
160: 'vr',
|
||
151: '4k',
|
||
2: '国产原创',
|
||
38: '欧美无码',
|
||
107: '三级写真',
|
||
152: '韩国主播'
|
||
}
|
||
|
||
# 排除词库 (用于标题猜测的兜底逻辑)
|
||
BLACKLIST_KEYWORDS = {
|
||
'高清', '中文', '字幕', '无码', '有码', '步兵', '骑兵', '破解', '流出',
|
||
'合集', '系列', '下载', '推荐', '新作', '大片', '偷拍', '自拍', '精选',
|
||
'汉化', '日韩', '欧美', '国产', '主播', '成人', '视频', '影片', '最新',
|
||
'强力', '严选', '首发', '独家', '今日', '更新', '特辑', '精选', '重磅',
|
||
'清晰', '完整', '版', '部', '集', '天', '月', '年', '号', '期'
|
||
}
|
||
|
||
if RUN_MODE == 'full':
|
||
MAX_PAGES_PER_FID = 500
|
||
ONLY_CRAWL_TODAY = False
|
||
else:
|
||
MAX_PAGES_PER_FID = 5
|
||
ONLY_CRAWL_TODAY = True
|
||
|
||
|
||
class SehuatangCrawler:
|
||
def __init__(self):
|
||
self.conn = None
|
||
self._connect_db()
|
||
self._init_db_table()
|
||
self.driver = self._init_driver()
|
||
self.session = None
|
||
self.today_str = datetime.now().strftime('%Y-%m-%d')
|
||
|
||
def _connect_db(self):
|
||
try:
|
||
self.conn = mysql.connector.connect(**DB_CONFIG)
|
||
if self.conn.is_connected():
|
||
logger.info("数据库连接成功")
|
||
except Error as e:
|
||
logger.error(f"数据库连接失败: {e}")
|
||
raise
|
||
|
||
def _init_db_table(self):
|
||
create_table_sql = """
|
||
CREATE TABLE IF NOT EXISTS forum_posts (
|
||
id INT AUTO_INCREMENT PRIMARY KEY,
|
||
tid VARCHAR(50) NOT NULL UNIQUE COMMENT '帖子ID',
|
||
fid INT NOT NULL COMMENT '板块ID',
|
||
category_name VARCHAR(50) COMMENT '板块名称',
|
||
designation VARCHAR(100) COMMENT '番号',
|
||
actress VARCHAR(100) COMMENT '出演女优',
|
||
title VARCHAR(255) NOT NULL COMMENT '标题',
|
||
magnet_link TEXT COMMENT '磁力链接',
|
||
cover_image VARCHAR(500) COMMENT '封面图URL',
|
||
post_url VARCHAR(255) NOT NULL COMMENT '帖子链接',
|
||
publish_date VARCHAR(20) COMMENT '发布时间',
|
||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
|
||
INDEX idx_designation (designation),
|
||
INDEX idx_actress (actress),
|
||
INDEX idx_publish_date (publish_date)
|
||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
|
||
"""
|
||
cursor = None
|
||
try:
|
||
cursor = self.conn.cursor()
|
||
cursor.execute(create_table_sql)
|
||
self.conn.commit()
|
||
except Error as e:
|
||
logger.error(f"表结构初始化失败: {e}")
|
||
finally:
|
||
if cursor: cursor.close()
|
||
|
||
def _init_driver(self):
|
||
options = uc.ChromeOptions()
|
||
# 规避检测的关键配置
|
||
options.headless = False
|
||
options.add_argument('--no-sandbox')
|
||
options.add_argument('--disable-gpu')
|
||
options.add_argument('--disable-dev-shm-usage')
|
||
|
||
# 如果依然在 Headless 触发检测,建议第一次运行设为 False 手动通过
|
||
driver = uc.Chrome(options=options)
|
||
return driver
|
||
|
||
def bypass_age_verification(self):
|
||
try:
|
||
self.driver.get("https://www.sehuatang.net/")
|
||
try:
|
||
btn = WebDriverWait(self.driver, 6).until(EC.element_to_be_clickable((By.XPATH, '//a[contains(text(), "满18岁,请点此进入")]')))
|
||
btn.click()
|
||
logger.success("通过年龄验证")
|
||
except Exception:
|
||
try:
|
||
btn2 = WebDriverWait(self.driver, 4).until(EC.element_to_be_clickable((By.XPATH, '//a[contains(text(), "满18岁")]')))
|
||
btn2.click()
|
||
logger.success("通过年龄验证")
|
||
except Exception:
|
||
pass
|
||
ua = self.driver.execute_script("return navigator.userAgent")
|
||
self.session = requests.Session()
|
||
self.session.headers.update({'User-Agent': ua, 'Referer': 'https://www.sehuatang.net/'})
|
||
for c in self.driver.get_cookies():
|
||
try:
|
||
self.session.cookies.set(c['name'], c['value'], domain=c.get('domain'), path=c.get('path', '/'))
|
||
except Exception:
|
||
self.session.cookies.set(c['name'], c['value'])
|
||
except Exception as e:
|
||
logger.warning(f"主页访问异常: {e}")
|
||
|
||
def parse_relative_date(self, date_str):
|
||
now = datetime.now()
|
||
date_str = date_str.strip()
|
||
try:
|
||
if "秒前" in date_str or "刚刚" in date_str:
|
||
return now.strftime('%Y-%m-%d')
|
||
elif "分钟前" in date_str:
|
||
minutes = int(re.search(r'(\d+)', date_str).group(1))
|
||
dt = now - timedelta(minutes=minutes)
|
||
return dt.strftime('%Y-%m-%d')
|
||
elif "小时前" in date_str:
|
||
hours = int(re.search(r'(\d+)', date_str).group(1))
|
||
dt = now - timedelta(hours=hours)
|
||
return dt.strftime('%Y-%m-%d')
|
||
elif "昨天" in date_str:
|
||
dt = now - timedelta(days=1)
|
||
return dt.strftime('%Y-%m-%d')
|
||
elif "前天" in date_str:
|
||
dt = now - timedelta(days=2)
|
||
return dt.strftime('%Y-%m-%d')
|
||
elif "天前" in date_str:
|
||
days = int(re.search(r'(\d+)', date_str).group(1))
|
||
dt = now - timedelta(days=days)
|
||
return dt.strftime('%Y-%m-%d')
|
||
else:
|
||
if re.match(r'\d{4}-\d{1,2}-\d{1,2}', date_str):
|
||
return date_str
|
||
match = re.match(r'(\d{1,2})-(\d{1,2})', date_str)
|
||
if match:
|
||
return f"{now.year}-{match.group(1).zfill(2)}-{match.group(2).zfill(2)}"
|
||
return now.strftime('%Y-%m-%d')
|
||
except Exception:
|
||
return now.strftime('%Y-%m-%d')
|
||
|
||
def extract_designation(self, title):
|
||
title = title.upper()
|
||
patterns = [
|
||
r'FC2-PPV-\d+',
|
||
r'\d{6}-\d{3}',
|
||
r'[A-Z]{2,5}-\d{3,5}',
|
||
r'T28-\d{3}',
|
||
r'[A-Z]{2,5}\d{3,5}'
|
||
]
|
||
for p in patterns:
|
||
match = re.search(p, title)
|
||
if match:
|
||
return match.group(0)
|
||
return ""
|
||
|
||
# ================= 策略B:从标题猜测女优 (兜底方案) =================
|
||
def extract_actress_from_title(self, title, designation):
|
||
"""当详情页没写女优时,尝试从标题分析"""
|
||
if designation:
|
||
title = re.sub(re.escape(designation), '', title, flags=re.IGNORECASE)
|
||
title = re.sub(r'(?i)\[.*?\]|【.*?】|\(.*?\)|mp4|1080p|720p|4k', ' ', title)
|
||
clean_text = re.sub(r'[^\u4e00-\u9fa5\s]', ' ', title)
|
||
tokens = clean_text.split()
|
||
potential_names = []
|
||
for token in tokens:
|
||
token = token.strip()
|
||
if 2 <= len(token) <= 4:
|
||
if token not in BLACKLIST_KEYWORDS:
|
||
potential_names.append(token)
|
||
if potential_names:
|
||
return potential_names[0]
|
||
return ""
|
||
|
||
def check_exists(self, tid, designation):
|
||
cursor = None
|
||
try:
|
||
cursor = self.conn.cursor()
|
||
sql_tid = "SELECT id FROM forum_posts WHERE tid = %s LIMIT 1"
|
||
cursor.execute(sql_tid, (tid,))
|
||
if cursor.fetchone():
|
||
return True
|
||
if designation:
|
||
sql_des = "SELECT id FROM forum_posts WHERE designation = %s LIMIT 1"
|
||
cursor.execute(sql_des, (designation,))
|
||
if cursor.fetchone():
|
||
return True
|
||
return False
|
||
except Error:
|
||
return False
|
||
finally:
|
||
if cursor: cursor.close()
|
||
|
||
def clean_magnet(self, magnet_text):
|
||
if not magnet_text: return ""
|
||
magnet_text = magnet_text.replace('复制代码', '').strip()
|
||
match = re.search(r'(magnet:\?xt=urn:btih:[a-zA-Z0-9]+[^\s"\'<>\u4e00-\u9fa5]*)', magnet_text)
|
||
if match: return match.group(1)
|
||
return magnet_text
|
||
|
||
def save_to_db(self, data):
|
||
cursor = None
|
||
try:
|
||
cursor = self.conn.cursor(dictionary=True)
|
||
sql = """
|
||
INSERT INTO forum_posts
|
||
(tid, fid, category_name, designation, actress, title, magnet_link, cover_image, post_url, publish_date)
|
||
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
||
ON DUPLICATE KEY UPDATE
|
||
title = VALUES(title),
|
||
actress = VALUES(actress),
|
||
magnet_link = VALUES(magnet_link),
|
||
cover_image = VALUES(cover_image),
|
||
updated_at = NOW()
|
||
"""
|
||
cursor.execute(sql, (
|
||
data['tid'], data['fid'], data['category_name'],
|
||
data['designation'], data['actress'], data['title'],
|
||
data['magnet_link'], data['cover_image'],
|
||
data['post_url'], data['publish_date']
|
||
))
|
||
self.conn.commit()
|
||
actress_log = f"[{data['actress']}] " if data['actress'] else ""
|
||
logger.info(f"保存: {actress_log}{data['title'][:15]}...")
|
||
except Error as e:
|
||
logger.error(f"DB错误: {e}")
|
||
if self.conn.is_connected(): self.conn.rollback()
|
||
finally:
|
||
if cursor: cursor.close()
|
||
|
||
# ================= 核心修改:详情页解析 =================
|
||
def parse_detail_page(self, post_url):
|
||
magnet_link = ""
|
||
cover_image = ""
|
||
actress_in_body = ""
|
||
|
||
try:
|
||
# 修复403问题: 使用Selenium访问而不是requests
|
||
self.driver.get(post_url)
|
||
# 等待内容加载
|
||
WebDriverWait(self.driver, 10).until(EC.presence_of_element_located((By.TAG_NAME, 'body')))
|
||
soup = BeautifulSoup(self.driver.page_source, 'html.parser')
|
||
|
||
# 调试: 检查页面标题(验证是否成功加载)
|
||
page_title = soup.find('title')
|
||
logger.debug(f"详情页标题: {page_title.get_text() if page_title else 'None'}, URL: {post_url}")
|
||
|
||
# 修复1: 使用正确的选择器
|
||
# 方法1: 直接找 td class="t_f"
|
||
content_td = soup.find('td', {'class': 't_f'})
|
||
|
||
# 方法2: 如果上面不行,尝试找 id="postmessage_*"
|
||
if not content_td:
|
||
content_td = soup.find('td', {'id': lambda x: x and x.startswith('postmessage_')})
|
||
|
||
# 调试: 检查是否找到容器
|
||
if content_td:
|
||
logger.debug(f"✓ 找到内容容器: id={content_td.get('id', '')}")
|
||
else:
|
||
logger.warning(f"✗ 未找到内容容器 (td class='t_f' 或 id='postmessage_*')")
|
||
# 尝试打印所有td标签看看有什么
|
||
all_tds = soup.find_all('td')
|
||
logger.debug(f"页面共有 {len(all_tds)} 个td标签")
|
||
return magnet_link, cover_image, actress_in_body
|
||
|
||
# 修复2: 磁力链接在 <div class="blockcode"> 里的纯文本中
|
||
magnet_div = content_td.find('div', {'class': 'blockcode'})
|
||
if magnet_div:
|
||
# 直接提取文本,去掉空白符
|
||
magnet_text = magnet_div.get_text(strip=True)
|
||
logger.debug(f"找到blockcode, 内容: {magnet_text[:100]}")
|
||
# 用正则匹配磁力链接
|
||
match = re.search(r'(magnet:\?xt=urn:btih:[a-zA-Z0-9]+[^\s"\'<>\u4e00-\u9fa5]*)', magnet_text)
|
||
if match:
|
||
magnet_link = match.group(1)
|
||
logger.debug(f"✓ 提取到磁力链接: {magnet_link[:50]}...")
|
||
else:
|
||
logger.debug(f"✗ blockcode中未匹配到磁力链接")
|
||
else:
|
||
logger.debug(f"✗ 未找到div class='blockcode'")
|
||
|
||
# 兜底:如果上面没找到,再尝试在整个文本中搜索
|
||
if not magnet_link:
|
||
full_text = content_td.get_text()
|
||
match = re.search(r'magnet:\?xt=urn:btih:[a-zA-Z0-9]+.*', full_text)
|
||
if match:
|
||
magnet_link = self.clean_magnet(match.group(0))
|
||
logger.debug(f"✓ 兜底提取到磁力链接: {magnet_link[:50]}...")
|
||
else:
|
||
logger.debug(f"✗ 整个文本中也未找到磁力链接")
|
||
|
||
# 封面图:逻辑正确,但要确保在正确的容器里查找
|
||
imgs = content_td.find_all('img')
|
||
logger.debug(f"内容容器中共有 {len(imgs)} 个img标签")
|
||
for img in imgs:
|
||
zoomfile = img.get('zoomfile')
|
||
file_attr = img.get('file')
|
||
if zoomfile and zoomfile.startswith('http') and 'static/image/common/none.gif' not in zoomfile:
|
||
cover_image = zoomfile
|
||
logger.debug(f"✓ 使用zoomfile作为封面: {cover_image[:50]}...")
|
||
break
|
||
if file_attr and file_attr.startswith('http') and 'static/image/common/none.gif' not in file_attr:
|
||
cover_image = file_attr
|
||
logger.debug(f"✓ 使用file作为封面: {cover_image[:50]}...")
|
||
break
|
||
|
||
if not cover_image:
|
||
logger.debug(f"✗ 未找到合适的封面图")
|
||
|
||
# 女优:从文本中提取
|
||
text_content = content_td.get_text(separator='\n')
|
||
actress_match = re.search(r'(?:【|\[)(?:出演)?女优(?:】|\])\s*[::]\s*([^\n<]+)', text_content)
|
||
if actress_match:
|
||
raw_actress = actress_match.group(1).strip()
|
||
actress_in_body = raw_actress.split('<')[0].split()[0].strip()
|
||
logger.debug(f"✓ 提取到女优: {actress_in_body}")
|
||
else:
|
||
logger.debug(f"✗ 未匹配到女优信息")
|
||
|
||
except Exception as e:
|
||
logger.error(f"详情页解析异常: {e}, URL: {post_url}")
|
||
|
||
return magnet_link, cover_image, actress_in_body
|
||
|
||
def crawl_forum(self, fid, category_name):
|
||
logger.info(f"=== 开始板块: {category_name} (FID: {fid}) ===")
|
||
consecutive_old_posts = 0
|
||
|
||
for page in range(1, MAX_PAGES_PER_FID + 1):
|
||
if RUN_MODE == 'daily' and consecutive_old_posts > 20:
|
||
break
|
||
|
||
list_url = f"https://www.sehuatang.net/forum-{fid}-{page}.html"
|
||
logger.info(f"正在爬取第 {page} 页")
|
||
|
||
try:
|
||
self.driver.get(list_url)
|
||
WebDriverWait(self.driver, 10).until(EC.presence_of_all_elements_located((By.CSS_SELECTOR, 'tbody[id^="normalthread"]')))
|
||
|
||
soup = BeautifulSoup(self.driver.page_source, 'html.parser')
|
||
threads = soup.find_all('tbody', {'id': lambda x: x and x.startswith('normalthread')})
|
||
if not threads: break
|
||
|
||
for thread in threads:
|
||
try:
|
||
tid_str = thread.get('id', '').split('_')[-1]
|
||
title_tag = thread.find('a', {'class': 's xst'})
|
||
if not title_tag: continue
|
||
title = title_tag.get_text()
|
||
|
||
date_span = thread.find('td', {'class': 'by'}).find('em').find('span')
|
||
raw_date = date_span.get_text() if date_span else ""
|
||
publish_date = self.parse_relative_date(raw_date)
|
||
logger.debug("publish_date: "+ publish_date)
|
||
if RUN_MODE == 'daily' and ONLY_CRAWL_TODAY:
|
||
if publish_date != self.today_str:
|
||
consecutive_old_posts += 1
|
||
if "昨天" in raw_date or publish_date < self.today_str:
|
||
continue
|
||
continue
|
||
else:
|
||
consecutive_old_posts = 0
|
||
|
||
designation = self.extract_designation(title)
|
||
|
||
if self.check_exists(tid_str, designation):
|
||
continue
|
||
|
||
partial_url = title_tag.get('href')
|
||
full_url = f"https://www.sehuatang.net/{partial_url}"
|
||
logger.info("get url : " + full_url)
|
||
# 获取详情页数据(含女优)
|
||
magnet, cover, body_actress = self.parse_detail_page(full_url)
|
||
logger.debug(f"magnet: {magnet}, cover: {cover}, body_actress: {body_actress}")
|
||
# ================= 决策逻辑 =================
|
||
# 优先用详情页里抓到的 body_actress
|
||
# 如果没抓到,再尝试用标题分析
|
||
final_actress = body_actress if body_actress else self.extract_actress_from_title(title,
|
||
designation)
|
||
|
||
if magnet:
|
||
post_data = {
|
||
'tid': tid_str,
|
||
'fid': fid,
|
||
'category_name': category_name,
|
||
'designation': designation,
|
||
'actress': final_actress, # 最终决定的女优名
|
||
'title': title,
|
||
'magnet_link': magnet,
|
||
'cover_image': cover,
|
||
'post_url': full_url,
|
||
'publish_date': publish_date
|
||
}
|
||
self.save_to_db(post_data)
|
||
|
||
except Exception as e:
|
||
logger.error(f"单贴异常: {e}")
|
||
continue
|
||
|
||
except Exception as e:
|
||
logger.error(f"列表页异常: {e}")
|
||
|
||
def close(self):
|
||
if self.conn and self.conn.is_connected():
|
||
self.conn.close()
|
||
if self.driver:
|
||
try:
|
||
self.driver.close()
|
||
except Exception:
|
||
pass
|
||
try:
|
||
self.driver.quit()
|
||
except Exception:
|
||
pass
|
||
|
||
def run(self):
|
||
try:
|
||
self.bypass_age_verification()
|
||
for fid, name in TARGET_FIDS.items():
|
||
self.crawl_forum(fid, name)
|
||
finally:
|
||
self.close()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
crawler = SehuatangCrawler()
|
||
crawler.run()
|