Files
abot/utils/sehuatang/sehuatang_bot.py
2026-01-07 16:01:04 +08:00

473 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import time
import os
import re
import requests
import mysql.connector
from mysql.connector import Error
from selenium.webdriver.common.by import By
import undetected_chromedriver as uc
if os.name == 'nt':
try:
uc.Chrome.__del__ = lambda self: None
except Exception:
pass
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from bs4 import BeautifulSoup
from loguru import logger
from datetime import datetime, timedelta
# ================= 配置区域 =================
# 运行模式: 'full' (全量) 或 'daily' (增量)
RUN_MODE = 'daily'
DB_CONFIG = {
'host': '192.168.2.41',
'port': 3306,
'user': 'root', # 【请修改】数据库用户名
'password': 'lw123456', # 【请修改】数据库密码
'database': 'message_archive', # 【请修改】数据库名 (pymysql中是'db', 这里是'database')
'charset': 'utf8mb4',
'use_pure': True # 可选使用纯Python实现避免某些C扩展依赖问题
}
TARGET_FIDS = {
103: '高清中文字幕',
104: '素人有码系列',
37: '亚洲有码原创',
36: '亚洲无码原创',
39: '动漫原创',
160: 'vr',
151: '4k',
2: '国产原创',
38: '欧美无码',
107: '三级写真',
152: '韩国主播'
}
# 排除词库 (用于标题猜测的兜底逻辑)
BLACKLIST_KEYWORDS = {
'高清', '中文', '字幕', '无码', '有码', '步兵', '骑兵', '破解', '流出',
'合集', '系列', '下载', '推荐', '新作', '大片', '偷拍', '自拍', '精选',
'汉化', '日韩', '欧美', '国产', '主播', '成人', '视频', '影片', '最新',
'强力', '严选', '首发', '独家', '今日', '更新', '特辑', '精选', '重磅',
'清晰', '完整', '', '', '', '', '', '', '', ''
}
if RUN_MODE == 'full':
MAX_PAGES_PER_FID = 500
ONLY_CRAWL_TODAY = False
else:
MAX_PAGES_PER_FID = 5
ONLY_CRAWL_TODAY = True
class SehuatangCrawler:
def __init__(self):
self.conn = None
self._connect_db()
self._init_db_table()
self.driver = self._init_driver()
self.session = None
self.today_str = datetime.now().strftime('%Y-%m-%d')
def _connect_db(self):
try:
self.conn = mysql.connector.connect(**DB_CONFIG)
if self.conn.is_connected():
logger.info("数据库连接成功")
except Error as e:
logger.error(f"数据库连接失败: {e}")
raise
def _init_db_table(self):
create_table_sql = """
CREATE TABLE IF NOT EXISTS forum_posts (
id INT AUTO_INCREMENT PRIMARY KEY,
tid VARCHAR(50) NOT NULL UNIQUE COMMENT '帖子ID',
fid INT NOT NULL COMMENT '板块ID',
category_name VARCHAR(50) COMMENT '板块名称',
designation VARCHAR(100) COMMENT '番号',
actress VARCHAR(100) COMMENT '出演女优',
title VARCHAR(255) NOT NULL COMMENT '标题',
magnet_link TEXT COMMENT '磁力链接',
cover_image VARCHAR(500) COMMENT '封面图URL',
post_url VARCHAR(255) NOT NULL COMMENT '帖子链接',
publish_date VARCHAR(20) COMMENT '发布时间',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_designation (designation),
INDEX idx_actress (actress),
INDEX idx_publish_date (publish_date)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
"""
cursor = None
try:
cursor = self.conn.cursor()
cursor.execute(create_table_sql)
self.conn.commit()
except Error as e:
logger.error(f"表结构初始化失败: {e}")
finally:
if cursor: cursor.close()
def _init_driver(self):
options = uc.ChromeOptions()
# 规避检测的关键配置
options.headless = False
options.add_argument('--no-sandbox')
options.add_argument('--disable-gpu')
options.add_argument('--disable-dev-shm-usage')
# 如果依然在 Headless 触发检测,建议第一次运行设为 False 手动通过
driver = uc.Chrome(options=options)
return driver
def bypass_age_verification(self):
try:
self.driver.get("https://www.sehuatang.net/")
try:
btn = WebDriverWait(self.driver, 6).until(EC.element_to_be_clickable((By.XPATH, '//a[contains(text(), "满18岁请点此进入")]')))
btn.click()
logger.success("通过年龄验证")
except Exception:
try:
btn2 = WebDriverWait(self.driver, 4).until(EC.element_to_be_clickable((By.XPATH, '//a[contains(text(), "满18岁")]')))
btn2.click()
logger.success("通过年龄验证")
except Exception:
pass
ua = self.driver.execute_script("return navigator.userAgent")
self.session = requests.Session()
self.session.headers.update({'User-Agent': ua, 'Referer': 'https://www.sehuatang.net/'})
for c in self.driver.get_cookies():
try:
self.session.cookies.set(c['name'], c['value'], domain=c.get('domain'), path=c.get('path', '/'))
except Exception:
self.session.cookies.set(c['name'], c['value'])
except Exception as e:
logger.warning(f"主页访问异常: {e}")
def parse_relative_date(self, date_str):
now = datetime.now()
date_str = date_str.strip()
try:
if "秒前" in date_str or "刚刚" in date_str:
return now.strftime('%Y-%m-%d')
elif "分钟前" in date_str:
minutes = int(re.search(r'(\d+)', date_str).group(1))
dt = now - timedelta(minutes=minutes)
return dt.strftime('%Y-%m-%d')
elif "小时前" in date_str:
hours = int(re.search(r'(\d+)', date_str).group(1))
dt = now - timedelta(hours=hours)
return dt.strftime('%Y-%m-%d')
elif "昨天" in date_str:
dt = now - timedelta(days=1)
return dt.strftime('%Y-%m-%d')
elif "前天" in date_str:
dt = now - timedelta(days=2)
return dt.strftime('%Y-%m-%d')
elif "天前" in date_str:
days = int(re.search(r'(\d+)', date_str).group(1))
dt = now - timedelta(days=days)
return dt.strftime('%Y-%m-%d')
else:
if re.match(r'\d{4}-\d{1,2}-\d{1,2}', date_str):
return date_str
match = re.match(r'(\d{1,2})-(\d{1,2})', date_str)
if match:
return f"{now.year}-{match.group(1).zfill(2)}-{match.group(2).zfill(2)}"
return now.strftime('%Y-%m-%d')
except Exception:
return now.strftime('%Y-%m-%d')
def extract_designation(self, title):
title = title.upper()
patterns = [
r'FC2-PPV-\d+',
r'\d{6}-\d{3}',
r'[A-Z]{2,5}-\d{3,5}',
r'T28-\d{3}',
r'[A-Z]{2,5}\d{3,5}'
]
for p in patterns:
match = re.search(p, title)
if match:
return match.group(0)
return ""
# ================= 策略B从标题猜测女优 (兜底方案) =================
def extract_actress_from_title(self, title, designation):
"""当详情页没写女优时,尝试从标题分析"""
if designation:
title = re.sub(re.escape(designation), '', title, flags=re.IGNORECASE)
title = re.sub(r'(?i)\[.*?\]|【.*?】|\(.*?\)|mp4|1080p|720p|4k', ' ', title)
clean_text = re.sub(r'[^\u4e00-\u9fa5\s]', ' ', title)
tokens = clean_text.split()
potential_names = []
for token in tokens:
token = token.strip()
if 2 <= len(token) <= 4:
if token not in BLACKLIST_KEYWORDS:
potential_names.append(token)
if potential_names:
return potential_names[0]
return ""
def check_exists(self, tid, designation):
cursor = None
try:
cursor = self.conn.cursor()
sql_tid = "SELECT id FROM forum_posts WHERE tid = %s LIMIT 1"
cursor.execute(sql_tid, (tid,))
if cursor.fetchone():
return True
if designation:
sql_des = "SELECT id FROM forum_posts WHERE designation = %s LIMIT 1"
cursor.execute(sql_des, (designation,))
if cursor.fetchone():
return True
return False
except Error:
return False
finally:
if cursor: cursor.close()
def clean_magnet(self, magnet_text):
if not magnet_text: return ""
magnet_text = magnet_text.replace('复制代码', '').strip()
match = re.search(r'(magnet:\?xt=urn:btih:[a-zA-Z0-9]+[^\s"\'<>\u4e00-\u9fa5]*)', magnet_text)
if match: return match.group(1)
return magnet_text
def save_to_db(self, data):
cursor = None
try:
cursor = self.conn.cursor(dictionary=True)
sql = """
INSERT INTO forum_posts
(tid, fid, category_name, designation, actress, title, magnet_link, cover_image, post_url, publish_date)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
title = VALUES(title),
actress = VALUES(actress),
magnet_link = VALUES(magnet_link),
cover_image = VALUES(cover_image),
updated_at = NOW()
"""
cursor.execute(sql, (
data['tid'], data['fid'], data['category_name'],
data['designation'], data['actress'], data['title'],
data['magnet_link'], data['cover_image'],
data['post_url'], data['publish_date']
))
self.conn.commit()
actress_log = f"[{data['actress']}] " if data['actress'] else ""
logger.info(f"保存: {actress_log}{data['title'][:15]}...")
except Error as e:
logger.error(f"DB错误: {e}")
if self.conn.is_connected(): self.conn.rollback()
finally:
if cursor: cursor.close()
# ================= 核心修改:详情页解析 =================
def parse_detail_page(self, post_url):
magnet_link = ""
cover_image = ""
actress_in_body = ""
try:
# 修复403问题: 使用Selenium访问而不是requests
self.driver.get(post_url)
# 等待内容加载
WebDriverWait(self.driver, 10).until(EC.presence_of_element_located((By.TAG_NAME, 'body')))
soup = BeautifulSoup(self.driver.page_source, 'html.parser')
# 调试: 检查页面标题(验证是否成功加载)
page_title = soup.find('title')
logger.debug(f"详情页标题: {page_title.get_text() if page_title else 'None'}, URL: {post_url}")
# 修复1: 使用正确的选择器
# 方法1: 直接找 td class="t_f"
content_td = soup.find('td', {'class': 't_f'})
# 方法2: 如果上面不行,尝试找 id="postmessage_*"
if not content_td:
content_td = soup.find('td', {'id': lambda x: x and x.startswith('postmessage_')})
# 调试: 检查是否找到容器
if content_td:
logger.debug(f"✓ 找到内容容器: id={content_td.get('id', '')}")
else:
logger.warning(f"✗ 未找到内容容器 (td class='t_f' 或 id='postmessage_*')")
# 尝试打印所有td标签看看有什么
all_tds = soup.find_all('td')
logger.debug(f"页面共有 {len(all_tds)} 个td标签")
return magnet_link, cover_image, actress_in_body
# 修复2: 磁力链接在 <div class="blockcode"> 里的纯文本中
magnet_div = content_td.find('div', {'class': 'blockcode'})
if magnet_div:
# 直接提取文本,去掉空白符
magnet_text = magnet_div.get_text(strip=True)
logger.debug(f"找到blockcode, 内容: {magnet_text[:100]}")
# 用正则匹配磁力链接
match = re.search(r'(magnet:\?xt=urn:btih:[a-zA-Z0-9]+[^\s"\'<>\u4e00-\u9fa5]*)', magnet_text)
if match:
magnet_link = match.group(1)
logger.debug(f"✓ 提取到磁力链接: {magnet_link[:50]}...")
else:
logger.debug(f"✗ blockcode中未匹配到磁力链接")
else:
logger.debug(f"✗ 未找到div class='blockcode'")
# 兜底:如果上面没找到,再尝试在整个文本中搜索
if not magnet_link:
full_text = content_td.get_text()
match = re.search(r'magnet:\?xt=urn:btih:[a-zA-Z0-9]+.*', full_text)
if match:
magnet_link = self.clean_magnet(match.group(0))
logger.debug(f"✓ 兜底提取到磁力链接: {magnet_link[:50]}...")
else:
logger.debug(f"✗ 整个文本中也未找到磁力链接")
# 封面图:逻辑正确,但要确保在正确的容器里查找
imgs = content_td.find_all('img')
logger.debug(f"内容容器中共有 {len(imgs)} 个img标签")
for img in imgs:
zoomfile = img.get('zoomfile')
file_attr = img.get('file')
if zoomfile and zoomfile.startswith('http') and 'static/image/common/none.gif' not in zoomfile:
cover_image = zoomfile
logger.debug(f"✓ 使用zoomfile作为封面: {cover_image[:50]}...")
break
if file_attr and file_attr.startswith('http') and 'static/image/common/none.gif' not in file_attr:
cover_image = file_attr
logger.debug(f"✓ 使用file作为封面: {cover_image[:50]}...")
break
if not cover_image:
logger.debug(f"✗ 未找到合适的封面图")
# 女优:从文本中提取
text_content = content_td.get_text(separator='\n')
actress_match = re.search(r'(?:【|\[)(?:出演)?女优(?:】|\])\s*[:]\s*([^\n<]+)', text_content)
if actress_match:
raw_actress = actress_match.group(1).strip()
actress_in_body = raw_actress.split('<')[0].split()[0].strip()
logger.debug(f"✓ 提取到女优: {actress_in_body}")
else:
logger.debug(f"✗ 未匹配到女优信息")
except Exception as e:
logger.error(f"详情页解析异常: {e}, URL: {post_url}")
return magnet_link, cover_image, actress_in_body
def crawl_forum(self, fid, category_name):
logger.info(f"=== 开始板块: {category_name} (FID: {fid}) ===")
consecutive_old_posts = 0
for page in range(1, MAX_PAGES_PER_FID + 1):
if RUN_MODE == 'daily' and consecutive_old_posts > 20:
break
list_url = f"https://www.sehuatang.net/forum-{fid}-{page}.html"
logger.info(f"正在爬取第 {page}")
try:
self.driver.get(list_url)
WebDriverWait(self.driver, 10).until(EC.presence_of_all_elements_located((By.CSS_SELECTOR, 'tbody[id^="normalthread"]')))
soup = BeautifulSoup(self.driver.page_source, 'html.parser')
threads = soup.find_all('tbody', {'id': lambda x: x and x.startswith('normalthread')})
if not threads: break
for thread in threads:
try:
tid_str = thread.get('id', '').split('_')[-1]
title_tag = thread.find('a', {'class': 's xst'})
if not title_tag: continue
title = title_tag.get_text()
date_span = thread.find('td', {'class': 'by'}).find('em').find('span')
raw_date = date_span.get_text() if date_span else ""
publish_date = self.parse_relative_date(raw_date)
logger.debug("publish_date: "+ publish_date)
if RUN_MODE == 'daily' and ONLY_CRAWL_TODAY:
if publish_date != self.today_str:
consecutive_old_posts += 1
if "昨天" in raw_date or publish_date < self.today_str:
continue
continue
else:
consecutive_old_posts = 0
designation = self.extract_designation(title)
if self.check_exists(tid_str, designation):
continue
partial_url = title_tag.get('href')
full_url = f"https://www.sehuatang.net/{partial_url}"
logger.info("get url : " + full_url)
# 获取详情页数据(含女优)
magnet, cover, body_actress = self.parse_detail_page(full_url)
logger.debug(f"magnet: {magnet}, cover: {cover}, body_actress: {body_actress}")
# ================= 决策逻辑 =================
# 优先用详情页里抓到的 body_actress
# 如果没抓到,再尝试用标题分析
final_actress = body_actress if body_actress else self.extract_actress_from_title(title,
designation)
if magnet:
post_data = {
'tid': tid_str,
'fid': fid,
'category_name': category_name,
'designation': designation,
'actress': final_actress, # 最终决定的女优名
'title': title,
'magnet_link': magnet,
'cover_image': cover,
'post_url': full_url,
'publish_date': publish_date
}
self.save_to_db(post_data)
except Exception as e:
logger.error(f"单贴异常: {e}")
continue
except Exception as e:
logger.error(f"列表页异常: {e}")
def close(self):
if self.conn and self.conn.is_connected():
self.conn.close()
if self.driver:
try:
self.driver.close()
except Exception:
pass
try:
self.driver.quit()
except Exception:
pass
def run(self):
try:
self.bypass_age_verification()
for fid, name in TARGET_FIDS.items():
self.crawl_forum(fid, name)
finally:
self.close()
if __name__ == "__main__":
crawler = SehuatangCrawler()
crawler.run()