#! /usr/bin/env python3 # -*- coding: utf-8 -*- import json import re from typing import Optional from loguru import logger import time from datetime import datetime import requests from lxml import etree from base import func_english_news class News(object): def __init__(self) -> None: self.LOG = logger self.week = {0: "周一", 1: "周二", 2: "周三", 3: "周四", 4: "周五", 5: "周六", 6: "周日"} self.headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/110.0"} def get_important_news(self): url = "https://www.cls.cn/api/sw?app=CailianpressWeb&os=web&sv=7.7.5" data = {"type": "telegram", "keyword": "你需要知道的隔夜全球要闻", "page": 0, "rn": 1, "os": "web", "sv": "7.7.5", "app": "CailianpressWeb"} try: rsp = requests.post(url=url, headers=self.headers, data=data) data = json.loads(rsp.text)["data"]["telegram"]["data"][0] news = data["descr"] timestamp = data["time"] ts = time.localtime(timestamp) weekday_news = datetime(*ts[:6]).weekday() except Exception as e: self.LOG.error(e) return "" weekday_now = datetime.now().weekday() if weekday_news != weekday_now: return "" # 旧闻,观察发现周二~周六早晨6点半左右发布 fmt_time = time.strftime("%Y年%m月%d日", ts) news = re.sub(r"(\d{1,2}、)", r"\n\1", news) fmt_news = "".join(etree.HTML(news).xpath(" // text()")) fmt_news = re.sub(r"周[一|二|三|四|五|六|日]你需要知道的", r"", fmt_news) return f"{fmt_time} {self.week[weekday_news]}\n{fmt_news}" def get_baidu_news(self): url = "https://top.baidu.com/api/board?platform=wise&tab=realtime" # 获取当前日期和英文星期名 now = datetime.now() current_date = now.strftime("%Y年%m月%d日") english_weekdays = ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"] chinese_weekdays = ["星期一", "星期二", "星期三", "星期四", "星期五", "星期六", "星期日"] # 将英文星期名映射为中文 current_weekday_index = now.weekday() # 获取当前是星期几(0代表星期一,6代表星期日) current_weekday_chinese = chinese_weekdays[current_weekday_index] # 初始化一个空字符串来存储结果 output = f"当前日期:{current_date} {current_weekday_chinese}\n\n" try: response = requests.get(url, headers=self.headers, timeout=10) response.raise_for_status() if response.status_code == 200: post = response.json() cards = post.get('data', {}).get('cards', []) index = 1 for card in cards: blocks = card.get('content', []) for block in blocks: articles = block.get('content', []) for article in articles: if isinstance(article, dict) and 'word' in article: title = str(article.get('word', '')).strip().replace(" ", "_") raw_url = str(article.get('url', '')).strip() url = raw_url.strip('`').strip() output += f"{index} :#{title}\n" index += 1 # 输出最终的字符串 return output else: self.LOG.error(f"获取百度新闻失败,状态码: {response.status_code}") return "获取百度新闻失败,请稍后再试" except Exception as e: self.LOG.error(f"获取百度新闻时出错: {e}") return f"获取百度新闻时出错: {e}" def get_eng_news(self, website): if website == 'nbc': return func_english_news.nbc() elif website == 'cnn': return func_english_news.cnn() elif website == 'abc': return func_english_news.abc() elif website == 'fox': return func_english_news.fox() elif website == 'bbc': return func_english_news.bbc() def get_news_60s(self) -> Optional[str]: """ 调用 60s 接口并提取 image 字段 :return: image url 或 None """ API_URL = "http://192.168.2.32:4399/v2/60s" try: resp = requests.get(API_URL) resp.raise_for_status() # HTTP 非 200 会抛异常 data = resp.json() return data.get("data", {}).get("image") except requests.RequestException as e: print(f"请求接口失败: {e}") except ValueError as e: print(f"JSON 解析失败: {e}") return None if __name__ == "__main__": news = News() print(news.get_baidu_news()) # # msg = "@水牛-分身 今日百度新闻" # # q = re.sub(r"@.*?[\u2005|\s]", "", msg).replace(" ", "") # # print(q) # print(news.get_eng_news('nbc'))