Files
abot/utils/media_downloader.py

153 lines
5.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import asyncio
import os
import aiohttp
import aiofiles
import uuid
from typing import Optional
from urllib.parse import urlparse
from loguru import logger
import time
class MediaDownloader:
"""媒体下载工具类,用于下载图片等媒体文件"""
def __init__(self, download_dir: str = None):
"""
初始化下载器
Args:
download_dir: 下载目录,默认为项目下的 media_downloads 目录
"""
self.LOG = logger
self.download_dir = download_dir or os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
"media_downloads"
)
os.makedirs(self.download_dir, exist_ok=True)
self.LOG.info(f"媒体下载目录: {self.download_dir}")
async def download_media(self, url: str, file_type: str = None) -> Optional[str]:
"""
下载媒体文件
Args:
url: 媒体文件URL
file_type: 文件类型(如'jpg','png'等)如果不指定则从URL中推断
Returns:
下载文件的本地绝对路径如果下载失败则返回None
"""
try:
# 从URL获取文件名和扩展名
parsed_url = urlparse(url)
filename = os.path.basename(parsed_url.path)
# 如果没有文件名或扩展名,则生成一个随机文件名
if not filename or '.' not in filename:
ext = file_type if file_type else await self._guess_file_type(url)
filename = f"{uuid.uuid4().hex}.{ext}" if ext else f"{uuid.uuid4().hex}"
local_path = os.path.join(self.download_dir, filename)
self.LOG.info(f"开始下载媒体文件: {url} -> {local_path}")
# 下载文件
async with aiohttp.ClientSession() as session:
async with session.get(url, timeout=30) as response:
response.raise_for_status()
async with aiofiles.open(local_path, 'wb') as f:
async for chunk in response.content.iter_chunked(8192):
if chunk:
await f.write(chunk)
self.LOG.info(f"媒体文件下载成功: {local_path}")
# 下载成功后清理旧文件
await self.clear_downloads()
return os.path.abspath(local_path)
except Exception as e:
self.LOG.error(f"下载媒体文件失败: {url}, 错误: {str(e)}")
return None
async def _guess_file_type(self, url: str) -> Optional[str]:
"""
从URL推断文件类型
Args:
url: 媒体文件URL
Returns:
文件扩展名(不含点)如果无法推断则返回None
"""
try:
parsed_url = urlparse(url)
path = parsed_url.path.lower()
if path.endswith('.jpg') or path.endswith('.jpeg'):
return 'jpg'
elif path.endswith('.png'):
return 'png'
elif path.endswith('.gif'):
return 'gif'
elif path.endswith('.mp4'):
return 'mp4'
elif path.endswith('.mp3'):
return 'mp3'
elif path.endswith('.pdf'):
return 'pdf'
else:
# 检查Content-Type
async with aiohttp.ClientSession() as session:
async with session.head(url, timeout=5) as response:
content_type = response.headers.get('Content-Type', '')
if 'image/jpeg' in content_type:
return 'jpg'
elif 'image/png' in content_type:
return 'png'
elif 'image/gif' in content_type:
return 'gif'
return None
except:
return None
async def clear_downloads(self, max_age_days: int = 3) -> None:
"""
清理超过指定天数的下载文件
Args:
max_age_days: 最大保留天数默认为3天
"""
try:
current_time = time.time()
max_age_seconds = max_age_days * 24 * 60 * 60
cleared_count = 0
# 遍历下载目录中的所有文件
for filename in os.listdir(self.download_dir):
file_path = os.path.join(self.download_dir, filename)
# 检查是否为文件
if os.path.isfile(file_path):
# 获取文件最后修改时间
file_mtime = os.path.getmtime(file_path)
file_age = current_time - file_mtime
# 如果文件超过最大保留时间,则删除
if file_age > max_age_seconds:
try:
await asyncio.to_thread(os.remove, file_path)
cleared_count += 1
self.LOG.debug(f"已删除过期文件: {file_path}")
except Exception as e:
self.LOG.error(f"删除文件失败 {file_path}: {str(e)}")
if cleared_count > 0:
self.LOG.info(f"清理完成,共删除 {cleared_count} 个过期文件")
except Exception as e:
self.LOG.error(f"清理下载文件时出错: {str(e)}")