删除以前历史的秀人下图逻辑,已经迁移至组件里面。

This commit is contained in:
liuwei
2025-04-15 14:56:58 +08:00
parent 24bafc4c20
commit 89af94e29c
5 changed files with 1 additions and 271 deletions

View File

@@ -37,7 +37,7 @@ def main(chat_type: int):
robot.onEveryTime("10:30", robot.send_epic_free_games)
# message report 1:数据自动从redis 转到sqllite
robot.onEveryTime("00:30", robot.message_count_to_db)
robot.onEveryTime("02:30", robot.message_count_to_db)
# 从db中提取并发送给相关群
robot.onEveryTime("09:30", robot.generate_and_send_ranking)

View File

@@ -1,3 +0,0 @@
[Xiuren]
enable = true
command = ["图来", "秀人"]

View File

@@ -1,39 +0,0 @@
import logging
import tomllib
from wcferry import WxMsg, Wcf
from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager
from xiuren.random_pic import get_xiuren_pic
class Xiuren:
def __init__(self, wcf: Wcf, gbm: GroupBotManager):
self.LOG = logging.getLogger(__name__)
self.wcf = wcf # 假设 wcf 对象在此类中初始化
self.gbm = gbm # 权限功能
with open("xiuren/config.toml", "rb") as f:
plugin_config = tomllib.load(f)
config = plugin_config["Xiuren"]
self.enable = config["enable"]
self.command = config["command"]
self.LOG.info(f"[秀人] 组件初始化完成,指令: {self.command}")
def handle_message(self, message: WxMsg):
if not self.enable:
return
content = str(message.content).strip()
command = content.split(" ")
if command[0] not in self.command:
return
# 如果触发了指令,但是没有权限,则返回权限不足
if self.gbm.get_group_permission(message.roomid, Feature.PIC) == PermissionStatus.DISABLED:
return
pic_path = get_xiuren_pic()
self.wcf.send_file(pic_path, message.roomid)

View File

@@ -1,51 +0,0 @@
import os
import random
def get_random_file_from_dir(directory):
image_extensions = {'.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', '.webp'}
image_files = []
if not os.path.exists(directory):
print(f"Error: Directory '{directory}' does not exist.")
return None
if not os.access(directory, os.R_OK):
print(f"Error: No read access to directory '{directory}'.")
return None
print(f"Scanning directory: {directory} (including subdirectories)")
# 使用 os.walk() 递归遍历所有子目录
for root, _, files in os.walk(directory):
for file in files:
_, ext = os.path.splitext(file)
if ext.lower() in image_extensions:
full_path = os.path.join(root, file)
image_files.append(full_path)
if not image_files:
print("No image files found in the directory (including subdirectories).")
return None
return random.choice(image_files)
def get_xiuren_pic():
# 使用示例
directory = 'xiuren' # 替换为你的目录路径
random_file_path = get_random_file_from_dir(directory)
if random_file_path:
return os.path.abspath(random_file_path)
else:
return None
def get_xiuren_heisi_pic():
# 使用示例
directory = 'xiuren/heisi' # 替换为你的目录路径
random_file_path = get_random_file_from_dir(directory)
if random_file_path:
return os.path.abspath(random_file_path)
else:
return None

View File

@@ -1,177 +0,0 @@
import requests
from bs4 import BeautifulSoup
import os
import time
import random
import re
from xiuren.xiuren_pdf import generate_pdf_from_images
def get_html(url, session):
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36',
'Referer': 'https://www.xiurenwang.cc/'
}
try:
response = session.get(url, headers=headers, verify=False)
response.raise_for_status()
return response.text
except requests.exceptions.RequestException as e:
print(f"请求 {url} 失败: {e}")
return None
def parse_initial_page(html):
soup = BeautifulSoup(html, 'html.parser')
posts = soup.find_all('a', href=lambda x: x and x.endswith('.html'))
# 取所有帖子,而不是仅前两个,以便后续查找未下载的帖子
post_info = []
for post in posts:
text = post.text.strip()
number_match = re.search(r'No\.(\d+)', text)
number = number_match.group(1) if number_match else None
if number:
url = 'https://www.xiurenwang.cc/' + post['href']
post_info.append({'url': url, 'number': number})
return post_info
def extract_post_details(html):
soup = BeautifulSoup(html, 'html.parser')
title = soup.title.text.strip() if soup.title else "未知标题"
# 提取可见图片URL
image_div = soup.find('div', id='image')
visible_image_urls = []
if image_div:
images = image_div.find_all('img', {'data-original': True})
visible_image_urls = [img.get('data-original') for img in images]
# 提取总图片数量
total_images = None
sp_div = soup.find('div', class_='sp')
if sp_div:
i_tags = sp_div.find_all('i', class_='i1')
if i_tags:
total_text = i_tags[0].text.strip()
number_match = re.search(r'(\d+)', total_text)
total_images = int(number_match.group(1)) if number_match else None
return title, visible_image_urls, total_images
def generate_image_urls(visible_image_urls, total_images):
if not visible_image_urls or not total_images:
print("未找到可见图片URL或总图片数")
return []
# 提取编号和基础路径
numbers = [int(url.split('/')[-1].split('.')[0]) for url in visible_image_urls]
min_number = min(numbers)
base_url = visible_image_urls[0].rsplit('/', 1)[0] + '/'
# 如果base_url已包含https://,不需要再次添加
if not base_url.startswith('https://'):
base_url = 'https://' + base_url.lstrip('/')
# 生成所有图片URL
image_urls = []
for i in range(total_images):
image_number = min_number + i
image_url = f"{base_url}{image_number}.jpg"
image_urls.append(image_url)
return image_urls
def download_image(image_url, filename, session, post_url):
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36',
'Referer': post_url
}
try:
response = session.get(image_url, headers=headers, verify=False)
response.raise_for_status()
with open(filename, 'wb') as f:
f.write(response.content)
print(f"已下载 {image_url}")
except requests.exceptions.RequestException as e:
print(f"下载 {image_url} 失败: {e}")
def download_images(image_urls, output_dir, session, post_url):
if not image_urls:
print("没有可下载的图片URL")
return
if not os.path.exists(output_dir):
os.makedirs(output_dir)
for i, image_url in enumerate(image_urls):
if not image_url:
print(f"无效URL在索引 {i}")
continue
filename = os.path.join(output_dir, f"{i + 1}.jpg")
download_image(image_url, filename, session, post_url)
# time.sleep(random.uniform(1, 3))
def xiuren_dowload_pic():
session = requests.Session()
initial_url = 'https://www.xiurenwang.cc/bang?f=7'
initial_html = get_html(initial_url, session)
if not initial_html:
print("无法获取初始页面")
return
post_info = parse_initial_page(initial_html)
if not post_info:
print("未找到有效帖子")
return
processed_count = 0 # 记录已处理的帖子数量
target_count = 1 # 目标处理2个新帖子
for post in post_info:
if processed_count >= target_count:
break
post_url = post['url']
post_number = post['number']
output_dir = post_number
# 检查本地文件夹是否已存在
if os.path.exists(output_dir):
print(f"帖子 {post_number} 的文件夹已存在,跳过")
continue
post_html = get_html(post_url, session)
if not post_html:
print(f"无法获取帖子 {post_number} 的页面")
continue
title, visible_image_urls, total_images = extract_post_details(post_html)
print(f"处理帖子 {post_number} - 标题: {title}, 总图片数: {total_images}")
if not visible_image_urls or not total_images:
print(f"帖子 {post_number} 缺少图片URL或总数跳过")
continue
image_urls = generate_image_urls(visible_image_urls, total_images)
if not image_urls:
print(f"帖子 {post_number} 未生成图片URL跳过")
continue
download_images(image_urls, output_dir, session, post_url)
print(f"完成处理帖子 {post_number}")
processed_count += 1
# 将下载好的帖子生成PDF
return generate_pdf_from_images('.')
if __name__ == '__main__':
xiuren_dowload_pic()