Refactoring

This commit is contained in:
Changhua
2023-11-26 00:11:48 +08:00
parent 6aafbc6eec
commit af2b317cd7
19 changed files with 85 additions and 57 deletions

45
base/chatglm/README.MD Normal file
View File

@@ -0,0 +1,45 @@
# ChatGLM3 集成使用说明
1. 需要取消配置中 chatglm 的注释, 并配置对应信息,使用 [ChatGLM3](https://github.com/THUDM/ChatGLM3), 启用最新版 ChatGLM3 根目录下 openai_api.py 获取 api 地址:
```yaml
# 如果要使用 chatglm取消下面的注释并填写相关内容
chatglm:
key: sk-012345678901234567890123456789012345678901234567 # 根据需要自己做key校验
api: http://localhost:8000/v1 # 根据自己的chatglm地址修改
proxy: # 如果你在国内你可能需要魔法大概长这样http://域名或者IP地址:端口号
prompt: 你是智能聊天机器人,你叫小薇 # 根据需要对角色进行设定
file_path: F:/Pictures/temp #设定生成图片和代码使用的文件夹路径
```
2. 修改 chatglm/tool_registry.py 工具里面的一下配置comfyUI 地址或者根据需要自己配置一些工具,函数名上需要加 @register_tool, 函数里面需要叫'''函数描述''',参数需要用 Annotated[str,'',True] 修饰,分别是类型,参数说明,是否必填,再加 ->加上对应的返回类型
```python
@register_tool
def get_confyui_image(prompt: Annotated[str, '要生成图片的提示词,注意必须是英文', True]) -> dict:
'''
生成图片
'''
with open("func_chatglm\\base.json", "r", encoding="utf-8") as f:
data2 = json.load(f)
data2['prompt']['3']['inputs']['seed'] = ''.join(
random.sample('123456789012345678901234567890', 14))
# 模型名称
data2['prompt']['4']['inputs']['ckpt_name'] = 'chilloutmix_NiPrunedFp32Fix.safetensors'
data2['prompt']['6']['inputs']['text'] = prompt # 正向提示词
# data2['prompt']['7']['inputs']['text']='' #反向提示词
cfui = ComfyUIApi(server_address="127.0.0.1:8188") # 根据自己comfyUI地址修改
images = cfui.get_images(data2['prompt'])
return {'res': images[0]['image'], 'res_type': 'image', 'filename': images[0]['filename']}
```
3. 使用 Code Interpreter 还需要安装 Jupyter 内核,默认名称叫 chatglm3
```
ipython kernel install --name chatglm3 --user
```
如果名称需要自定义可以配置系统环境变量IPYKERNEL 或者修改 chatglm/code_kernel.py
```
IPYKERNEL = os.environ.get('IPYKERNEL', 'chatglm3')
```
4. 启动后,发送 #帮助 可以查看 模式和常用指令

13
base/chatglm/__init__.py Normal file
View File

@@ -0,0 +1,13 @@
import sys
class UnsupportedPythonVersionError(Exception):
def __init__(self, error_msg: str):
super().__init__(error_msg)
python_version_info = sys.version_info
if not sys.version_info >= (3, 9):
msg = "当前Python版本: " + ".".join(map(str, python_version_info[:3])) + (', 需要python版本 >= 3.9, 前往下载: '
'https://www.python.org/downloads/')
raise UnsupportedPythonVersionError(msg)

88
base/chatglm/base.json Normal file
View File

@@ -0,0 +1,88 @@
{
"prompt": {
"3": {
"inputs": {
"seed": 1000573256060686,
"steps": 20,
"cfg": 8,
"sampler_name": "euler",
"scheduler": "normal",
"denoise": 1,
"model": [
"4",
0
],
"positive": [
"6",
0
],
"negative": [
"7",
0
],
"latent_image": [
"5",
0
]
},
"class_type": "KSampler"
},
"4": {
"inputs": {
"ckpt_name": "(修复)512-inpainting-ema.safetensors"
},
"class_type": "CheckpointLoaderSimple"
},
"5": {
"inputs": {
"width": 512,
"height": 512,
"batch_size": 1
},
"class_type": "EmptyLatentImage"
},
"6": {
"inputs": {
"text": "beautiful scenery nature glass bottle landscape, , purple galaxy bottle,dress, ",
"clip": [
"4",
1
]
},
"class_type": "CLIPTextEncode"
},
"7": {
"inputs": {
"text": "text, watermark",
"clip": [
"4",
1
]
},
"class_type": "CLIPTextEncode"
},
"8": {
"inputs": {
"samples": [
"3",
0
],
"vae": [
"4",
2
]
},
"class_type": "VAEDecode"
},
"9": {
"inputs": {
"filename_prefix": "ComfyUI",
"images": [
"8",
0
]
},
"class_type": "SaveImage"
}
}
}

199
base/chatglm/code_kernel.py Normal file
View File

@@ -0,0 +1,199 @@
import base64
import os
import queue
import re
from io import BytesIO
from subprocess import PIPE
from typing import Optional, Union
import jupyter_client
from PIL import Image
IPYKERNEL = os.environ.get('IPYKERNEL', 'chatglm3')
class CodeKernel(object):
def __init__(self,
kernel_name='kernel',
kernel_id=None,
kernel_config_path="",
python_path=None,
ipython_path=None,
init_file_path="./startup.py",
verbose=1):
self.kernel_name = kernel_name
self.kernel_id = kernel_id
self.kernel_config_path = kernel_config_path
self.python_path = python_path
self.ipython_path = ipython_path
self.init_file_path = init_file_path
self.verbose = verbose
if python_path is None and ipython_path is None:
env = None
else:
env = {"PATH": self.python_path + ":$PATH",
"PYTHONPATH": self.python_path}
# Initialize the backend kernel
self.kernel_manager = jupyter_client.KernelManager(kernel_name=IPYKERNEL,
connection_file=self.kernel_config_path,
exec_files=[
self.init_file_path],
env=env)
if self.kernel_config_path:
self.kernel_manager.load_connection_file()
self.kernel_manager.start_kernel(stdout=PIPE, stderr=PIPE)
print("Backend kernel started with the configuration: {}".format(
self.kernel_config_path))
else:
self.kernel_manager.start_kernel(stdout=PIPE, stderr=PIPE)
print("Backend kernel started with the configuration: {}".format(
self.kernel_manager.connection_file))
if verbose:
print(self.kernel_manager.get_connection_info())
# Initialize the code kernel
self.kernel = self.kernel_manager.blocking_client()
# self.kernel.load_connection_file()
self.kernel.start_channels()
print("Code kernel started.")
def execute(self, code):
self.kernel.execute(code)
try:
shell_msg = self.kernel.get_shell_msg(timeout=40)
io_msg_content = self.kernel.get_iopub_msg(timeout=40)['content']
while True:
msg_out = io_msg_content
# Poll the message
try:
io_msg_content = self.kernel.get_iopub_msg(timeout=40)[
'content']
if 'execution_state' in io_msg_content and io_msg_content['execution_state'] == 'idle':
break
except queue.Empty:
break
return shell_msg, msg_out
except Exception as e:
print(e)
return None
def execute_interactive(self, code, verbose=False):
shell_msg = self.kernel.execute_interactive(code)
if shell_msg is queue.Empty:
if verbose:
print("Timeout waiting for shell message.")
self.check_msg(shell_msg, verbose=verbose)
return shell_msg
def inspect(self, code, verbose=False):
msg_id = self.kernel.inspect(code)
shell_msg = self.kernel.get_shell_msg(timeout=30)
if shell_msg is queue.Empty:
if verbose:
print("Timeout waiting for shell message.")
self.check_msg(shell_msg, verbose=verbose)
return shell_msg
def get_error_msg(self, msg, verbose=False) -> Optional[str]:
if msg['content']['status'] == 'error':
try:
error_msg = msg['content']['traceback']
except BaseException:
try:
error_msg = msg['content']['traceback'][-1].strip()
except BaseException:
error_msg = "Traceback Error"
if verbose:
print("Error: ", error_msg)
return error_msg
return None
def check_msg(self, msg, verbose=False):
status = msg['content']['status']
if status == 'ok':
if verbose:
print("Execution succeeded.")
elif status == 'error':
for line in msg['content']['traceback']:
if verbose:
print(line)
def shutdown(self):
# Shutdown the backend kernel
self.kernel_manager.shutdown_kernel()
print("Backend kernel shutdown.")
# Shutdown the code kernel
self.kernel.shutdown()
print("Code kernel shutdown.")
def restart(self):
# Restart the backend kernel
self.kernel_manager.restart_kernel()
# print("Backend kernel restarted.")
def interrupt(self):
# Interrupt the backend kernel
self.kernel_manager.interrupt_kernel()
# print("Backend kernel interrupted.")
def is_alive(self):
return self.kernel.is_alive()
def b64_2_img(data):
buff = BytesIO(base64.b64decode(data))
return Image.open(buff)
def clean_ansi_codes(input_string):
ansi_escape = re.compile(r'(\x9B|\x1B\[|\u001b\[)[0-?]*[ -/]*[@-~]')
return ansi_escape.sub('', input_string)
def execute(code, kernel: CodeKernel) -> tuple[str, Union[str, Image.Image]]:
res = ""
res_type = None
code = code.replace("<|observation|>", "")
code = code.replace("<|assistant|>interpreter", "")
code = code.replace("<|assistant|>", "")
code = code.replace("<|user|>", "")
code = code.replace("<|system|>", "")
msg, output = kernel.execute(code)
if msg['metadata']['status'] == "timeout":
return res_type, 'Timed out'
elif msg['metadata']['status'] == 'error':
return res_type, clean_ansi_codes('\n'.join(kernel.get_error_msg(msg, verbose=True)))
if 'text' in output:
res_type = "text"
res = output['text']
elif 'data' in output:
for key in output['data']:
if 'image/png' in key:
res_type = "image"
res = output['data'][key]
break
elif 'text/plain' in key:
res_type = "text"
res = output['data'][key]
if res_type == "image":
return res_type, b64_2_img(res)
elif res_type == "text" or res_type == "traceback":
res = res
return res_type, res
def extract_code(text: str) -> str:
pattern = r'```([^\n]*)\n(.*?)```'
matches = re.findall(pattern, text, re.DOTALL)
return matches[-1][1]

186
base/chatglm/comfyUI_api.py Normal file
View File

@@ -0,0 +1,186 @@
# This is an example that uses the websockets api to know when a prompt execution is done
# Once the prompt execution is done it downloads the images using the /history endpoint
import io
import json
import random
import urllib
import uuid
import requests
# NOTE: websocket-client (https://github.com/websocket-client/websocket-client)
import websocket
from PIL import Image
class ComfyUIApi():
def __init__(self, server_address="127.0.0.1:8188"):
self.server_address = server_address
self.client_id = str(uuid.uuid4())
self.ws = websocket.WebSocket()
self.ws.connect(
"ws://{}/ws?clientId={}".format(server_address, self.client_id))
def queue_prompt(self, prompt):
p = {"prompt": prompt, "client_id": self.client_id}
data = json.dumps(p).encode('utf-8')
req = requests.post(
"http://{}/prompt".format(self.server_address), data=data)
print(req.text)
return json.loads(req.text)
def get_image(self, filename, subfolder, folder_type):
data = {"filename": filename,
"subfolder": subfolder, "type": folder_type}
url_values = urllib.parse.urlencode(data)
with requests.get("http://{}/view?{}".format(self.server_address, url_values)) as response:
image = Image.open(io.BytesIO(response.content))
return image
def get_image_url(self, filename, subfolder, folder_type):
data = {"filename": filename,
"subfolder": subfolder, "type": folder_type}
url_values = urllib.parse.urlencode(data)
return "http://{}/view?{}".format(self.server_address, url_values)
def get_history(self, prompt_id):
with requests.get("http://{}/history/{}".format(self.server_address, prompt_id)) as response:
return json.loads(response.text)
def get_images(self, prompt, isUrl=False):
prompt_id = self.queue_prompt(prompt)['prompt_id']
output_images = []
while True:
out = self.ws.recv()
if isinstance(out, str):
message = json.loads(out)
if message['type'] == 'executing':
data = message['data']
if data['node'] is None and data['prompt_id'] == prompt_id:
break # Execution is done
else:
continue # previews are binary data
history = self.get_history(prompt_id)[prompt_id]
for o in history['outputs']:
for node_id in history['outputs']:
node_output = history['outputs'][node_id]
if 'images' in node_output:
for image in node_output['images']:
image_data = self.get_image_url(image['filename'], image['subfolder'], image['type']) if isUrl else self.get_image(
image['filename'], image['subfolder'], image['type'])
image['image'] = image_data
output_images.append(image)
return output_images
prompt_text = """
{
"3": {
"class_type": "KSampler",
"inputs": {
"cfg": 8,
"denoise": 1,
"latent_image": [
"5",
0
],
"model": [
"4",
0
],
"negative": [
"7",
0
],
"positive": [
"6",
0
],
"sampler_name": "euler",
"scheduler": "normal",
"seed": 8566257,
"steps": 20
}
},
"4": {
"class_type": "CheckpointLoaderSimple",
"inputs": {
"ckpt_name": "chilloutmix_NiPrunedFp32Fix.safetensors"
}
},
"5": {
"class_type": "EmptyLatentImage",
"inputs": {
"batch_size": 1,
"height": 512,
"width": 512
}
},
"6": {
"class_type": "CLIPTextEncode",
"inputs": {
"clip": [
"4",
1
],
"text": "masterpiece best quality girl"
}
},
"7": {
"class_type": "CLIPTextEncode",
"inputs": {
"clip": [
"4",
1
],
"text": "bad hands"
}
},
"8": {
"class_type": "VAEDecode",
"inputs": {
"samples": [
"3",
0
],
"vae": [
"4",
2
]
}
},
"9": {
"class_type": "SaveImage",
"inputs": {
"filename_prefix": "ComfyUI",
"images": [
"8",
0
]
}
}
}
"""
if __name__ == '__main__':
prompt = json.loads(prompt_text)
# set the text prompt for our positive CLIPTextEncode
prompt["6"]["inputs"]["text"] = "masterpiece best quality man"
# set the seed for our KSampler node
prompt["3"]["inputs"]["seed"] = ''.join(
random.sample('123456789012345678901234567890', 14))
cfui = ComfyUIApi()
images = cfui.get_images(prompt)
# Commented out code to display the output images:
for node_id in images:
for image_data in images[node_id]:
import io
from PIL import Image
image = Image.open(io.BytesIO(image_data))
image.show()

View File

@@ -0,0 +1,167 @@
import inspect
import json
import random
import re
import traceback
from copy import deepcopy
from datetime import datetime
from types import GenericAlias
from typing import Annotated, get_origin
from base.chatglm.comfyUI_api import ComfyUIApi
from base.func_news import News
from zhdate import ZhDate
_TOOL_HOOKS = {}
_TOOL_DESCRIPTIONS = {}
def extract_code(text: str) -> str:
pattern = r'```([^\n]*)\n(.*?)```'
matches = re.findall(pattern, text, re.DOTALL)
return matches[-1][1]
def register_tool(func: callable):
tool_name = func.__name__
tool_description = inspect.getdoc(func).strip()
python_params = inspect.signature(func).parameters
tool_params = []
for name, param in python_params.items():
annotation = param.annotation
if annotation is inspect.Parameter.empty:
raise TypeError(f"Parameter `{name}` missing type annotation")
if get_origin(annotation) != Annotated:
raise TypeError(
f"Annotation type for `{name}` must be typing.Annotated")
typ, (description, required) = annotation.__origin__, annotation.__metadata__
typ: str = str(typ) if isinstance(typ, GenericAlias) else typ.__name__
if not isinstance(description, str):
raise TypeError(f"Description for `{name}` must be a string")
if not isinstance(required, bool):
raise TypeError(f"Required for `{name}` must be a bool")
tool_params.append({
"name": name,
"description": description,
"type": typ,
"required": required
})
tool_def = {
"name": tool_name,
"description": tool_description,
"params": tool_params
}
# print("[registered tool] " + pformat(tool_def))
_TOOL_HOOKS[tool_name] = func
_TOOL_DESCRIPTIONS[tool_name] = tool_def
return func
def dispatch_tool(tool_name: str, tool_params: dict) -> str:
if tool_name not in _TOOL_HOOKS:
return f"Tool `{tool_name}` not found. Please use a provided tool."
tool_call = _TOOL_HOOKS[tool_name]
try:
ret = tool_call(**tool_params)
except BaseException:
ret = traceback.format_exc()
return ret
def get_tools() -> dict:
return deepcopy(_TOOL_DESCRIPTIONS)
# Tool Definitions
# @register_tool
# def random_number_generator(
# seed: Annotated[int, 'The random seed used by the generator', True],
# range: Annotated[tuple[int, int], 'The range of the generated numbers', True],
# ) -> int:
# """
# Generates a random number x, s.t. range[0] <= x < range[1]
# """
# if not isinstance(seed, int):
# raise TypeError("Seed must be an integer")
# if not isinstance(range, tuple):
# raise TypeError("Range must be a tuple")
# if not isinstance(range[0], int) or not isinstance(range[1], int):
# raise TypeError("Range must be a tuple of integers")
# import random
# return random.Random(seed).randint(*range)
@register_tool
def get_weather(
city_name: Annotated[str, 'The name of the city to be queried', True],
) -> str:
"""
Get the current weather for `city_name`
"""
if not isinstance(city_name, str):
raise TypeError("City name must be a string")
key_selection = {
"current_condition": ["temp_C", "FeelsLikeC", "humidity", "weatherDesc", "observation_time"],
}
import requests
try:
resp = requests.get(f"https://wttr.in/{city_name}?format=j1")
resp.raise_for_status()
resp = resp.json()
ret = {k: {_v: resp[k][0][_v] for _v in v}
for k, v in key_selection.items()}
except BaseException:
import traceback
ret = "Error encountered while fetching weather data!\n" + traceback.format_exc()
return str(ret)
@register_tool
def get_confyui_image(prompt: Annotated[str, '要生成图片的提示词,注意必须是英文', True]) -> dict:
'''
生成图片
'''
with open("chatglm\\base.json", "r", encoding="utf-8") as f:
data2 = json.load(f)
data2['prompt']['3']['inputs']['seed'] = ''.join(
random.sample('123456789012345678901234567890', 14))
# 模型名称
data2['prompt']['4']['inputs']['ckpt_name'] = 'chilloutmix_NiPrunedFp32Fix.safetensors'
data2['prompt']['6']['inputs']['text'] = prompt # 正向提示词
# data2['prompt']['7']['inputs']['text']='' #反向提示词
cfui = ComfyUIApi(server_address="127.0.0.1:8188") # 根据自己comfyUI地址修改
images = cfui.get_images(data2['prompt'])
return {'res': images[0]['image'], 'res_type': 'image', 'filename': images[0]['filename']}
@register_tool
def get_news() -> str:
'''
获取最新新闻
'''
news = News()
return news.get_important_news()
@register_tool
def get_time() -> str:
'''
获取当前日期,时间,农历日期,星期几
'''
time = datetime.now()
date2 = ZhDate.from_datetime(time)
week_list = ["星期一", "星期二", "星期三", "星期四", "星期五", "星期六", "星期日"]
return '{} {} {}'.format(time.strftime("%Y年%m月%d%H:%M:%S"), week_list[time.weekday()], '农历:' + date2.chinese())
if __name__ == "__main__":
print(dispatch_tool("get_weather", {"city_name": "beijing"}))
print(get_tools())

30903
base/chengyu.csv Normal file

File diff suppressed because it is too large Load Diff

190
base/func_chatglm.py Normal file
View File

@@ -0,0 +1,190 @@
#! /usr/bin/env python3
# -*- coding: utf-8 -*-
import json
import os
import random
from datetime import datetime
from typing import Optional
import openai
from base.chatglm.code_kernel import CodeKernel, execute
from base.chatglm.tool_registry import dispatch_tool, extract_code, get_tools
from wcferry import Wcf
functions = get_tools()
class ChatGLM:
def __init__(self, config={}, wcf: Optional[Wcf] = None, max_retry=5) -> None:
openai.api_key = config.get("key", "empty")
# 自己搭建或第三方代理的接口
openai.api_base = config["api"]
proxy = config.get("proxy")
if proxy:
openai.proxy = {"http": proxy, "https": proxy}
self.conversation_list = {}
self.chat_type = {}
self.max_retry = max_retry
self.wcf = wcf
self.filePath = config["file_path"]
self.kernel = CodeKernel()
self.system_content_msg = {"chat": [{"role": "system", "content": config["prompt"]}],
"tool": [{"role": "system", "content": "Answer the following questions as best as you can. You have access to the following tools:"}],
"code": [{"role": "system", "content": "你是一位智能AI助手你叫ChatGLM你连接着一台电脑但请注意不能联网。在使用Python解决任务时你可以运行代码并得到结果如果运行结果有错误你需要尽可能对代码进行改进。你可以处理用户上传到电脑上的文件文件默认存储路径是{}".format(self.filePath)}]}
def __repr__(self):
return 'ChatGLM'
@staticmethod
def value_check(conf: dict) -> bool:
if conf:
if conf.get("api") and conf.get("prompt") and conf.get("file_path"):
return True
return False
def get_answer(self, question: str, wxid: str) -> str:
# wxid或者roomid,个人时为微信id群消息时为群id
if '#帮助' == question:
return '本助手有三种模式,#聊天模式 = #1 #工具模式 = #2 #代码模式 = #3 , #清除模式会话 = #4 , #清除全部会话 = #5 可用发送#对应模式 或者 #编号 进行切换'
elif '#聊天模式' == question or '#1' == question:
self.chat_type[wxid] = 'chat'
return '已切换#聊天模式'
elif '#工具模式' == question or '#2' == question:
self.chat_type[wxid] = 'tool'
return '已切换#工具模式 \n工具有:查看天气,日期,新闻,comfyUI文生图。例如\n帮我生成一张小鸟的图片,提示词必须是英文'
elif '#代码模式' == question or '#3' == question:
self.chat_type[wxid] = 'code'
return '已切换#代码模式 \n代码模式可以用于写python代码例如\n用python画一个爱心'
elif '#清除模式会话' == question or '#4' == question:
self.conversation_list[wxid][self.chat_type[wxid]
] = self.system_content_msg[self.chat_type[wxid]]
return '已清除'
elif '#清除全部会话' == question or '#5' == question:
self.conversation_list[wxid] = self.system_content_msg
return '已清除'
self.updateMessage(wxid, question, "user")
try:
params = dict(model="chatglm3", temperature=1.0,
messages=self.conversation_list[wxid][self.chat_type[wxid]], stream=False)
if 'tool' == self.chat_type[wxid]:
params["functions"] = functions
response = openai.ChatCompletion.create(**params)
for _ in range(self.max_retry):
if response.choices[0].message.get("function_call"):
function_call = response.choices[0].message.function_call
print(
f"Function Call Response: {function_call.to_dict_recursive()}")
function_args = json.loads(function_call.arguments)
observation = dispatch_tool(
function_call.name, function_args)
if isinstance(observation, dict):
res_type = observation['res_type'] if 'res_type' in observation else 'text'
res = observation['res'] if 'res_type' in observation else str(
observation)
if res_type == 'image':
filename = observation['filename']
filePath = os.path.join(self.filePath, filename)
res.save(filePath)
self.wcf and self.wcf.send_image(filePath, wxid)
tool_response = '[Image]' if res_type == 'image' else res
else:
tool_response = observation if isinstance(
observation, str) else str(observation)
print(f"Tool Call Response: {tool_response}")
params["messages"].append(response.choices[0].message)
params["messages"].append(
{
"role": "function",
"name": function_call.name,
"content": tool_response, # 调用函数返回结果
}
)
self.updateMessage(wxid, tool_response, "function")
response = openai.ChatCompletion.create(**params)
elif response.choices[0].message.content.find('interpreter') != -1:
output_text = response.choices[0].message.content
code = extract_code(output_text)
self.wcf and self.wcf.send_text('代码如下:\n' + code, wxid)
self.wcf and self.wcf.send_text('执行代码...', wxid)
try:
res_type, res = execute(code, self.kernel)
except Exception as e:
rsp = f'代码执行错误: {e}'
break
if res_type == 'image':
filename = '{}.png'.format(''.join(random.sample(
'abcdefghijklmnopqrstuvwxyz1234567890', 8)))
filePath = os.path.join(self.filePath, filename)
res.save(filePath)
self.wcf and self.wcf.send_image(filePath, wxid)
else:
self.wcf and self.wcf.send_text("执行结果:\n" + res, wxid)
tool_response = '[Image]' if res_type == 'image' else res
print("Received:", res_type, res)
params["messages"].append(response.choices[0].message)
params["messages"].append(
{
"role": "function",
"name": "interpreter",
"content": tool_response, # 调用函数返回结果
}
)
self.updateMessage(wxid, tool_response, "function")
response = openai.ChatCompletion.create(**params)
else:
rsp = response.choices[0].message.content
break
self.updateMessage(wxid, rsp, "assistant")
except Exception as e0:
rsp = "发生未知错误:" + str(e0)
return rsp
def updateMessage(self, wxid: str, question: str, role: str) -> None:
now_time = str(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
# 初始化聊天记录,组装系统信息
if wxid not in self.conversation_list.keys():
self.conversation_list[wxid] = self.system_content_msg
if wxid not in self.chat_type.keys():
self.chat_type[wxid] = 'chat'
# 当前问题
content_question_ = {"role": role, "content": question}
self.conversation_list[wxid][self.chat_type[wxid]].append(
content_question_)
# 只存储10条记录超过滚动清除
i = len(self.conversation_list[wxid][self.chat_type[wxid]])
if i > 10:
print("滚动清除微信记录:" + wxid)
# 删除多余的记录,倒着删,且跳过第一个的系统消息
del self.conversation_list[wxid][self.chat_type[wxid]][1]
if __name__ == "__main__":
from configuration import Config
config = Config().CHATGLM
if not config:
exit(0)
chat = ChatGLM(config)
while True:
q = input(">>> ")
try:
time_start = datetime.now() # 记录开始时间
print(chat.get_answer(q, "wxid"))
time_end = datetime.now() # 记录结束时间
# 计算的时间差为程序的执行时间,单位为秒/s
print(f"{round((time_end - time_start).total_seconds(), 2)}s")
except Exception as e:
print(e)

104
base/func_chatgpt.py Normal file
View File

@@ -0,0 +1,104 @@
#! /usr/bin/env python3
# -*- coding: utf-8 -*-
from datetime import datetime
import openai
class ChatGPT:
def __init__(self, conf: dict) -> None:
openai.api_key = conf["key"]
# 自己搭建或第三方代理的接口
openai.api_base = conf["api"]
proxy = conf["proxy"]
if proxy:
openai.proxy = {"http": proxy, "https": proxy}
self.conversation_list = {}
self.system_content_msg = {"role": "system", "content": conf["prompt"]}
def __repr__(self):
return 'ChatGPT'
@staticmethod
def value_check(conf: dict) -> bool:
if conf:
if conf.get("key") and conf.get("api") and conf.get("prompt"):
return True
return False
def get_answer(self, question: str, wxid: str) -> str:
# wxid或者roomid,个人时为微信id群消息时为群id
self.updateMessage(wxid, question, "user")
try:
ret = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=self.conversation_list[wxid],
temperature=0.2
)
rsp = ret["choices"][0]["message"]["content"]
rsp = rsp[2:] if rsp.startswith("\n\n") else rsp
rsp = rsp.replace("\n\n", "\n")
self.updateMessage(wxid, rsp, "assistant")
except openai.error.AuthenticationError as e3:
rsp = "OpenAI API 认证失败,请检查 API 密钥是否正确"
except openai.error.APIConnectionError as e2:
rsp = "无法连接到 OpenAI API请检查网络连接"
except openai.error.APIError as e1:
rsp = "OpenAI API 返回了错误:" + str(e1)
except Exception as e0:
rsp = "发生未知错误:" + str(e0)
return rsp
def updateMessage(self, wxid: str, question: str, role: str) -> None:
now_time = str(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
time_mk = "当需要回答时间时请直接参考回复:"
# 初始化聊天记录,组装系统信息
if wxid not in self.conversation_list.keys():
question_ = [
self.system_content_msg,
{"role": "system", "content": "" + time_mk + now_time}
]
self.conversation_list[wxid] = question_
# 当前问题
content_question_ = {"role": role, "content": question}
self.conversation_list[wxid].append(content_question_)
for cont in self.conversation_list[wxid]:
if cont["role"] != "system":
continue
if cont["content"].startswith(time_mk):
cont["content"] = time_mk + now_time
# 只存储10条记录超过滚动清除
i = len(self.conversation_list[wxid])
if i > 10:
print("滚动清除微信记录:" + wxid)
# 删除多余的记录,倒着删,且跳过第一个的系统消息
del self.conversation_list[wxid][1]
if __name__ == "__main__":
from configuration import Config
config = Config().CHATGPT
if not config:
exit(0)
chat = ChatGPT(config)
while True:
q = input(">>> ")
try:
time_start = datetime.now() # 记录开始时间
print(chat.get_answer(q, "wxid"))
time_end = datetime.now() # 记录结束时间
print(f"{round((time_end - time_start).total_seconds(), 2)}s") # 计算的时间差为程序的执行时间,单位为秒/s
except Exception as e:
print(e)

79
base/func_chengyu.py Normal file
View File

@@ -0,0 +1,79 @@
# -*- coding: utf-8 -*-
import os
import random
import pandas as pd
class Chengyu(object):
def __init__(self) -> None:
root = os.path.dirname(os.path.abspath(__file__))
self.df = pd.read_csv(f"{root}/chengyu.csv", delimiter="\t")
self.cys, self.zis, self.yins = self._build_data()
def _build_data(self):
df = self.df.copy()
df["shouzi"] = df["chengyu"].apply(lambda x: x[0])
df["mozi"] = df["chengyu"].apply(lambda x: x[-1])
df["shouyin"] = df["pingyin"].apply(lambda x: x.split(" ")[0])
df["moyin"] = df["pingyin"].apply(lambda x: x.split(" ")[-1])
cys = dict(zip(df["chengyu"], df["moyin"]))
zis = df.groupby("shouzi").agg({"chengyu": set})["chengyu"].to_dict()
yins = df.groupby("shouyin").agg({"chengyu": set})["chengyu"].to_dict()
return cys, zis, yins
def isChengyu(self, cy: str) -> bool:
return self.cys.get(cy, None) is not None
def getNext(self, cy: str, tongyin: bool = True) -> str:
"""获取下一个成语
cy: 当前成语
tongyin: 是否允许同音字
"""
zi = cy[-1]
ansers = list(self.zis.get(zi, {}))
try:
ansers.remove(cy) # 移除当前成语
except Exception as e:
pass # Just ignore...
if ansers:
return random.choice(ansers)
# 如果找不到同字,允许同音
if tongyin:
yin = self.cys.get(cy)
ansers = list(self.yins.get(yin, {}))
try:
ansers.remove(cy) # 移除当前成语
except Exception as e:
pass # Just ignore...
if ansers:
return random.choice(ansers)
return None
def getMeaning(self, cy: str) -> str:
ress = self.df[self.df["chengyu"] == cy].to_dict(orient="records")
if ress:
res = ress[0]
rsp = res["chengyu"] + "\n" + res["pingyin"] + "\n" + res["jieshi"]
if res["chuchu"] and res["chuchu"] != "":
rsp += "\n出处:" + res["chuchu"]
if res["lizi"] and res["lizi"] != "":
rsp += "\n例子:" + res["lizi"]
return rsp
return None
cy = Chengyu()
if __name__ == "__main__":
answer = cy.getNext("便宜行事")
print(answer)

51
base/func_news.py Normal file
View File

@@ -0,0 +1,51 @@
#! /usr/bin/env python3
# -*- coding: utf-8 -*-
import json
import re
import logging
import time
from datetime import datetime
import requests
from lxml import etree
class News(object):
def __init__(self) -> None:
self.LOG = logging.getLogger(__name__)
self.week = {0: "周一", 1: "周二", 2: "周三", 3: "周四", 4: "周五", 5: "周六", 6: "周日"}
self.headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/110.0"}
def get_important_news(self):
url = "https://www.cls.cn/api/sw?app=CailianpressWeb&os=web&sv=7.7.5"
data = {"type": "telegram", "keyword": "你需要知道的隔夜全球要闻", "page": 0,
"rn": 1, "os": "web", "sv": "7.7.5", "app": "CailianpressWeb"}
try:
rsp = requests.post(url=url, headers=self.headers, data=data)
data = json.loads(rsp.text)["data"]["telegram"]["data"][0]
news = data["descr"]
timestamp = data["time"]
ts = time.localtime(timestamp)
weekday_news = datetime(*ts[:6]).weekday()
except Exception as e:
self.LOG.error(e)
return ""
weekday_now = datetime.now().weekday()
if weekday_news != weekday_now:
return "" # 旧闻观察发现周二周六早晨6点半左右发布
fmt_time = time.strftime("%Y年%m月%d", ts)
news = re.sub(r"(\d{1,2}、)", r"\n\1", news)
fmt_news = "".join(etree.HTML(news).xpath(" // text()"))
fmt_news = re.sub(r"周[一|二|三|四|五|六|日]你需要知道的", r"", fmt_news)
return f"{fmt_time} {self.week[weekday_news]}\n{fmt_news}"
if __name__ == "__main__":
news = News()
print(news.get_important_news())

View File

@@ -0,0 +1,61 @@
import calendar
import datetime
from chinese_calendar import is_workday
from robot import Robot
class ReportReminder:
@staticmethod
def remind(robot: Robot) -> None:
receivers = robot.config.REPORT_REMINDERS
if not receivers:
receivers = ["filehelper"]
# 日报周报月报提醒
for receiver in receivers:
today = datetime.datetime.now().date()
# 如果是非工作日
if not is_workday(today):
robot.sendTextMsg("休息日快乐", receiver)
# 如果是工作日
if is_workday(today):
robot.sendTextMsg("该发日报啦", receiver)
# 如果是本周最后一个工作日
if ReportReminder.last_work_day_of_week(today) == today:
robot.sendTextMsg("该发周报啦", receiver)
# 如果本日是本月最后一整周的最后一个工作日:
if ReportReminder.last_work_friday_of_month(today) == today:
robot.sendTextMsg("该发月报啦", receiver)
# 计算本月最后一个周的最后一个工作日
@staticmethod
def last_work_friday_of_month(d: datetime.date) -> datetime.date:
days_in_month = calendar.monthrange(d.year, d.month)[1]
weekday = calendar.weekday(d.year, d.month, days_in_month)
if weekday == 4:
last_friday_of_month = datetime.date(
d.year, d.month, days_in_month)
else:
if weekday >= 5:
last_friday_of_month = datetime.date(d.year, d.month, days_in_month) - \
datetime.timedelta(days=(weekday - 4))
else:
last_friday_of_month = datetime.date(d.year, d.month, days_in_month) - \
datetime.timedelta(days=(weekday + 3))
while not is_workday(last_friday_of_month):
last_friday_of_month = last_friday_of_month - datetime.timedelta(days=1)
return last_friday_of_month
# 计算本周最后一个工作日
@staticmethod
def last_work_day_of_week(d: datetime.date) -> datetime.date:
weekday = calendar.weekday(d.year, d.month, d.day)
last_work_day_of_week = datetime.date(
d.year, d.month, d.day) + datetime.timedelta(days=(6 - weekday))
while not is_workday(last_work_day_of_week):
last_work_day_of_week = last_work_day_of_week - \
datetime.timedelta(days=1)
return last_work_day_of_week

49
base/func_tigerbot.py Normal file
View File

@@ -0,0 +1,49 @@
#! /usr/bin/env python3
# -*- coding: utf-8 -*-
import logging
import requests
from random import randint
class TigerBot:
def __init__(self, tbconf=None) -> None:
self.LOG = logging.getLogger(__file__)
self.tburl = "https://api.tigerbot.com/bot-service/ai_service/gpt"
self.tbheaders = {"Authorization": "Bearer " + tbconf["key"]}
self.tbmodel = tbconf["model"]
self.fallback = ["", "快滚", "赶紧滚"]
def __repr__(self):
return 'TigerBot'
@staticmethod
def value_check(conf: dict) -> bool:
if conf:
return all(conf.values())
return False
def get_answer(self, msg: str, sender: str = None) -> str:
payload = {
"text": msg,
"modelVersion": self.tbmodel
}
rsp = ""
try:
rsp = requests.post(self.tburl, headers=self.tbheaders, json=payload).json()
rsp = rsp["data"]["result"][0]
except Exception as e:
self.LOG.error(f"{e}: {payload}\n{rsp}")
idx = randint(0, len(self.fallback) - 1)
rsp = self.fallback[idx]
return rsp
if __name__ == "__main__":
from configuration import Config
c = Config()
tbot = TigerBot(c.TIGERBOT)
rsp = tbot.get_answer("你还活着?")
print(rsp)

38
base/func_xinghuo_web.py Normal file
View File

@@ -0,0 +1,38 @@
#! /usr/bin/env python3
# -*- coding: utf-8 -*-
from sparkdesk_web.core import SparkWeb
class XinghuoWeb:
def __init__(self, xhconf=None) -> None:
self._sparkWeb = SparkWeb(
cookie=xhconf["cookie"],
fd=xhconf["fd"],
GtToken=xhconf["GtToken"],
)
self._chat = self._sparkWeb.create_continuous_chat()
# 如果有提示词
if xhconf["prompt"]:
self._chat.chat(xhconf["prompt"])
def __repr__(self):
return 'XinghuoWeb'
@staticmethod
def value_check(conf: dict) -> bool:
if conf:
return all(conf.values())
return False
def get_answer(self, msg: str, sender: str = None) -> str:
answer = self._chat.chat(msg)
return answer
if __name__ == "__main__":
from configuration import Config
c = Config()
xinghuo = XinghuoWeb(c.XINGHUO_WEB)
rsp = xinghuo.get_answer("你还活着?")
print(rsp)