代码优化

This commit is contained in:
严浪
2023-11-13 14:29:57 +08:00
parent db3ff14c71
commit 9ca9d31fee
10 changed files with 210 additions and 143 deletions

41
chatglm/README.MD Normal file
View File

@@ -0,0 +1,41 @@
# ChatGLM3集成使用说明
* 1.需要取消配置中 chatglm 的注释, 并配置对应信息使用ChatGLM3,启用最新版ChatGLM3根目录下openai_api.py获取api地址
```yaml
# 如果要使用 chatglm取消下面的注释并填写相关内容
chatglm:
key: xxx #根据需要自己做key校验
api: http://localhost:8000/v1 # 根据自己的chatglm地址修改
proxy: # 如果你在国内你可能需要魔法大概长这样http://域名或者IP地址:端口号
prompt: 你是智能聊天机器人,你叫小薇 # 根据需要对角色进行设定
file_path: F:/Pictures/temp #设定生成图片和代码使用的文件夹路径
```
* 2.修改chatglm/tool_registry.py工具里面的一下配置comfyUI地址或者根据需要自己配置一些工具,函数名上需要加@register_tool,函数里面需要叫'''函数描述'''参数需要用Annotated[str,'',True]修饰,分别是类型,参数说明,是否必填,再加->加上对应的返回类型
```python
@register_tool
def get_confyui_image(prompt: Annotated[str, '要生成图片的提示词,注意必须是英文', True]) -> dict:
'''
生成图片
'''
with open("func_chatglm\\base.json", "r", encoding="utf-8") as f:
data2 = json.load(f)
data2['prompt']['3']['inputs']['seed'] = ''.join(
random.sample('123456789012345678901234567890', 14))
# 模型名称
data2['prompt']['4']['inputs']['ckpt_name'] = 'chilloutmix_NiPrunedFp32Fix.safetensors'
data2['prompt']['6']['inputs']['text'] = prompt # 正向提示词
# data2['prompt']['7']['inputs']['text']='' #反向提示词
cfui = ComfyUIApi(server_address="127.0.0.1:8188") # 根据自己comfyUI地址修改
images = cfui.get_images(data2['prompt'])
return {'res': images[0]['image'], 'res_type': 'image', 'filename': images[0]['filename']}
```
* 3 使用 Code Interpreter 还需要安装 Jupyter 内核,默认名称叫chatglm3
```
ipython kernel install --name chatglm3 --user
```
如果名称需要自定义可以配置系统环境变量IPYKERNEL 或者修改 chatglm/code_kernel.py
```
IPYKERNEL = os.environ.get('IPYKERNEL', 'chatglm3')
```
* 4 启动后,发送 #帮助 可以查看 模式和常用指令

View File

@@ -5,12 +5,14 @@ from pprint import pprint
import queue
import re
from subprocess import PIPE
from typing import Dict, Union, Optional, Tuple
import jupyter_client
from PIL import Image
import time
IPYKERNEL = os.environ.get('IPYKERNEL', 'chatglm3')
class CodeKernel(object):
def __init__(self,
kernel_name='kernel',
@@ -28,16 +30,18 @@ class CodeKernel(object):
self.ipython_path = ipython_path
self.init_file_path = init_file_path
self.verbose = verbose
if python_path is None and ipython_path is None:
env = None
else:
env = {"PATH": self.python_path + ":$PATH", "PYTHONPATH": self.python_path}
env = {"PATH": self.python_path + ":$PATH",
"PYTHONPATH": self.python_path}
# Initialize the backend kernel
self.kernel_manager = jupyter_client.KernelManager(kernel_name=IPYKERNEL,
self.kernel_manager = jupyter_client.KernelManager(kernel_name=IPYKERNEL,
connection_file=self.kernel_config_path,
exec_files=[self.init_file_path],
exec_files=[
self.init_file_path],
env=env)
if self.kernel_config_path:
self.kernel_manager.load_connection_file()
@@ -65,14 +69,15 @@ class CodeKernel(object):
io_msg_content = self.kernel.get_iopub_msg(timeout=40)['content']
while True:
msg_out = io_msg_content
### Poll the message
# Poll the message
try:
io_msg_content = self.kernel.get_iopub_msg(timeout=40)['content']
io_msg_content = self.kernel.get_iopub_msg(timeout=40)[
'content']
if 'execution_state' in io_msg_content and io_msg_content['execution_state'] == 'idle':
break
except queue.Empty:
break
return shell_msg, msg_out
except Exception as e:
print(e)
@@ -97,14 +102,14 @@ class CodeKernel(object):
return shell_msg
def get_error_msg(self, msg, verbose=False) -> str | None:
def get_error_msg(self, msg, verbose=False) -> Optional[str]:
if msg['content']['status'] == 'error':
try:
error_msg = msg['content']['traceback']
except:
except BaseException:
try:
error_msg = msg['content']['traceback'][-1].strip()
except:
except BaseException:
error_msg = "Traceback Error"
if verbose:
print("Error: ", error_msg)
@@ -141,16 +146,19 @@ class CodeKernel(object):
def is_alive(self):
return self.kernel.is_alive()
def b64_2_img(data):
buff = BytesIO(base64.b64decode(data))
return Image.open(buff)
def clean_ansi_codes(input_string):
ansi_escape = re.compile(r'(\x9B|\x1B\[|\u001b\[)[0-?]*[ -/]*[@-~]')
return ansi_escape.sub('', input_string)
def execute(code, kernel: CodeKernel) -> tuple[str, str | Image.Image]:
def execute(code, kernel: CodeKernel) -> tuple[str, Union[str, Image.Image]]:
res = ""
res_type = None
code = code.replace("<|observation|>", "")
@@ -159,12 +167,12 @@ def execute(code, kernel: CodeKernel) -> tuple[str, str | Image.Image]:
code = code.replace("<|user|>", "")
code = code.replace("<|system|>", "")
msg, output = kernel.execute(code)
if msg['metadata']['status'] == "timeout":
return res_type, 'Timed out'
elif msg['metadata']['status'] == 'error':
return res_type, clean_ansi_codes('\n'.join(kernel.get_error_msg(msg, verbose=True)))
if 'text' in output:
res_type = "text"
res = output['text']
@@ -177,17 +185,16 @@ def execute(code, kernel: CodeKernel) -> tuple[str, str | Image.Image]:
elif 'text/plain' in key:
res_type = "text"
res = output['data'][key]
if res_type == "image":
return res_type, b64_2_img(res)
elif res_type == "text" or res_type == "traceback":
res = res
return res_type, res
def extract_code(text: str) -> str:
pattern = r'```([^\n]*)\n(.*?)```'
matches = re.findall(pattern, text, re.DOTALL)
return matches[-1][1]

View File

@@ -1,7 +1,8 @@
#This is an example that uses the websockets api to know when a prompt execution is done
#Once the prompt execution is done it downloads the images using the /history endpoint
# This is an example that uses the websockets api to know when a prompt execution is done
# Once the prompt execution is done it downloads the images using the /history endpoint
import websocket #NOTE: websocket-client (https://github.com/websocket-client/websocket-client)
# NOTE: websocket-client (https://github.com/websocket-client/websocket-client)
import websocket
import uuid
import json
import requests
@@ -13,35 +14,39 @@ from PIL import Image
class ComfyUIApi():
def __init__(self, server_address="127.0.0.1:8188"):
self.server_address=server_address
self.client_id=str(uuid.uuid4())
self.server_address = server_address
self.client_id = str(uuid.uuid4())
self.ws = websocket.WebSocket()
self.ws.connect("ws://{}/ws?clientId={}".format(server_address, self.client_id))
self.ws.connect(
"ws://{}/ws?clientId={}".format(server_address, self.client_id))
def queue_prompt(self,prompt):
def queue_prompt(self, prompt):
p = {"prompt": prompt, "client_id": self.client_id}
data = json.dumps(p).encode('utf-8')
req = requests.post("http://{}/prompt".format(self.server_address), data=data)
req = requests.post(
"http://{}/prompt".format(self.server_address), data=data)
print(req.text)
return json.loads(req.text)
def get_image(self,filename, subfolder, folder_type):
data = {"filename": filename, "subfolder": subfolder, "type": folder_type}
def get_image(self, filename, subfolder, folder_type):
data = {"filename": filename,
"subfolder": subfolder, "type": folder_type}
url_values = urllib.parse.urlencode(data)
with requests.get("http://{}/view?{}".format(self.server_address, url_values)) as response:
image = Image.open(io.BytesIO(response.content))
return image
def get_image_url(self,filename, subfolder, folder_type):
data = {"filename": filename, "subfolder": subfolder, "type": folder_type}
def get_image_url(self, filename, subfolder, folder_type):
data = {"filename": filename,
"subfolder": subfolder, "type": folder_type}
url_values = urllib.parse.urlencode(data)
return "http://{}/view?{}".format(self.server_address, url_values)
def get_history(self,prompt_id):
def get_history(self, prompt_id):
with requests.get("http://{}/history/{}".format(self.server_address, prompt_id)) as response:
return json.loads(response.text)
def get_images(self,prompt,isUrl=False):
def get_images(self, prompt, isUrl=False):
prompt_id = self.queue_prompt(prompt)['prompt_id']
output_images = []
while True:
@@ -51,9 +56,9 @@ class ComfyUIApi():
if message['type'] == 'executing':
data = message['data']
if data['node'] is None and data['prompt_id'] == prompt_id:
break #Execution is done
break # Execution is done
else:
continue #previews are binary data
continue # previews are binary data
history = self.get_history(prompt_id)[prompt_id]
for o in history['outputs']:
@@ -61,12 +66,14 @@ class ComfyUIApi():
node_output = history['outputs'][node_id]
if 'images' in node_output:
for image in node_output['images']:
image_data =self.get_image_url(image['filename'], image['subfolder'], image['type']) if isUrl else self.get_image(image['filename'], image['subfolder'], image['type'])
image['image']=image_data
image_data = self.get_image_url(image['filename'], image['subfolder'], image['type']) if isUrl else self.get_image(
image['filename'], image['subfolder'], image['type'])
image['image'] = image_data
output_images.append(image)
return output_images
prompt_text = """
{
"3": {
@@ -157,16 +164,17 @@ prompt_text = """
"""
if __name__ == '__main__':
prompt = json.loads(prompt_text)
#set the text prompt for our positive CLIPTextEncode
# set the text prompt for our positive CLIPTextEncode
prompt["6"]["inputs"]["text"] = "masterpiece best quality man"
#set the seed for our KSampler node
prompt["3"]["inputs"]["seed"] = ''.join(random.sample('123456789012345678901234567890',14))
# set the seed for our KSampler node
prompt["3"]["inputs"]["seed"] = ''.join(
random.sample('123456789012345678901234567890', 14))
cfui=ComfyUIApi()
cfui = ComfyUIApi()
images = cfui.get_images(prompt)
#Commented out code to display the output images:
# Commented out code to display the output images:
for node_id in images:
for image_data in images[node_id]:
@@ -174,4 +182,3 @@ if __name__ == '__main__':
import io
image = Image.open(io.BytesIO(image_data))
image.show()

View File

@@ -9,7 +9,7 @@ import requests
import random
import time
import re
from tool.comfyUI_api import ComfyUIApi
from chatglm.comfyUI_api import ComfyUIApi
from func_news import News
from zhdate import ZhDate
from datetime import datetime
@@ -17,6 +17,7 @@ from datetime import datetime
_TOOL_HOOKS = {}
_TOOL_DESCRIPTIONS = {}
def extract_code(text: str) -> str:
pattern = r'```([^\n]*)\n(.*?)```'
matches = re.findall(pattern, text, re.DOTALL)
@@ -33,8 +34,9 @@ def register_tool(func: callable):
if annotation is inspect.Parameter.empty:
raise TypeError(f"Parameter `{name}` missing type annotation")
if get_origin(annotation) != Annotated:
raise TypeError(f"Annotation type for `{name}` must be typing.Annotated")
raise TypeError(
f"Annotation type for `{name}` must be typing.Annotated")
typ, (description, required) = annotation.__origin__, annotation.__metadata__
typ: str = str(typ) if isinstance(typ, GenericAlias) else typ.__name__
if not isinstance(description, str):
@@ -54,22 +56,24 @@ def register_tool(func: callable):
"params": tool_params
}
#print("[registered tool] " + pformat(tool_def))
# print("[registered tool] " + pformat(tool_def))
_TOOL_HOOKS[tool_name] = func
_TOOL_DESCRIPTIONS[tool_name] = tool_def
return func
def dispatch_tool(tool_name: str, tool_params: dict) -> str:
if tool_name not in _TOOL_HOOKS:
return f"Tool `{tool_name}` not found. Please use a provided tool."
tool_call = _TOOL_HOOKS[tool_name]
try:
ret = tool_call(**tool_params)
except:
ret = tool_call(**tool_params)
except BaseException:
ret = traceback.format_exc()
return ret
def get_tools() -> dict:
return deepcopy(_TOOL_DESCRIPTIONS)
@@ -77,7 +81,7 @@ def get_tools() -> dict:
# @register_tool
# def random_number_generator(
# seed: Annotated[int, 'The random seed used by the generator', True],
# seed: Annotated[int, 'The random seed used by the generator', True],
# range: Annotated[tuple[int, int], 'The range of the generated numbers', True],
# ) -> int:
# """
@@ -93,6 +97,7 @@ def get_tools() -> dict:
# import random
# return random.Random(seed).randint(*range)
@register_tool
def get_weather(
city_name: Annotated[str, 'The name of the city to be queried', True],
@@ -104,34 +109,39 @@ def get_weather(
raise TypeError("City name must be a string")
key_selection = {
"current_condition": ["temp_C", "FeelsLikeC", "humidity", "weatherDesc", "observation_time"],
"current_condition": ["temp_C", "FeelsLikeC", "humidity", "weatherDesc", "observation_time"],
}
import requests
try:
resp = requests.get(f"https://wttr.in/{city_name}?format=j1")
resp.raise_for_status()
resp = resp.json()
ret = {k: {_v: resp[k][0][_v] for _v in v} for k, v in key_selection.items()}
except:
ret = {k: {_v: resp[k][0][_v] for _v in v}
for k, v in key_selection.items()}
except BaseException:
import traceback
ret = "Error encountered while fetching weather data!\n" + traceback.format_exc()
ret = "Error encountered while fetching weather data!\n" + traceback.format_exc()
return str(ret)
@register_tool
def get_confyui_image(prompt: Annotated[str, '要生成图片的提示词,注意必须是英文', True]) -> dict:
'''
生成图片
'''
with open("tool\\base.json", "r", encoding="utf-8") as f:
with open("chatglm\\base.json", "r", encoding="utf-8") as f:
data2 = json.load(f)
data2['prompt']['3']['inputs']['seed']=''.join(random.sample('123456789012345678901234567890',14))
data2['prompt']['4']['inputs']['ckpt_name']='chilloutmix_NiPrunedFp32Fix.safetensors' #模型名称
data2['prompt']['6']['inputs']['text']=prompt #正向提示词
#data2['prompt']['7']['inputs']['text']='' #反向提示词
cfui=ComfyUIApi(server_address="127.0.0.1:8188") #根据自己comfyUI地址修改
data2['prompt']['3']['inputs']['seed'] = ''.join(
random.sample('123456789012345678901234567890', 14))
# 模型名称
data2['prompt']['4']['inputs']['ckpt_name'] = 'chilloutmix_NiPrunedFp32Fix.safetensors'
data2['prompt']['6']['inputs']['text'] = prompt # 正向提示词
# data2['prompt']['7']['inputs']['text']='' #反向提示词
cfui = ComfyUIApi(server_address="127.0.0.1:8188") # 根据自己comfyUI地址修改
images = cfui.get_images(data2['prompt'])
return {'res':images[0]['image'],'res_type':'image','filename':images[0]['filename']}
return {'res': images[0]['image'], 'res_type': 'image', 'filename': images[0]['filename']}
@register_tool
def get_news() -> str:
@@ -141,16 +151,17 @@ def get_news() -> str:
news = News()
return news.get_important_news()
@register_tool
def get_time() -> str:
'''
获取当前日期时间农历日期星期几
'''
time=datetime.now()
time = datetime.now()
date2 = ZhDate.from_datetime(time)
week_list = ["星期一","星期二","星期三","星期四","星期五","星期六","星期日"]
return '{} {} {}'.format(time.strftime("%Y年%m月%d%H:%M:%S"),week_list[time.weekday()],'农历:'+date2.chinese())
week_list = ["星期一", "星期二", "星期三", "星期四", "星期五", "星期六", "星期日"]
return '{} {} {}'.format(time.strftime("%Y年%m月%d%H:%M:%S"), week_list[time.weekday()], '农历:' + date2.chinese())
if __name__ == "__main__":

View File

@@ -54,12 +54,12 @@ report_reminder:
# proxy: # 如果你在国内你可能需要魔法大概长这样http://域名或者IP地址:端口号
# prompt: 你是智能聊天机器人你叫wcferry # 根据需要对角色进行设定
chatglm:
key: key
api: http://localhost:8000/v1 # 根据自己的chatglm地址修改
proxy: # 如果你在国内你可能需要魔法大概长这样http://域名或者IP地址:端口号
prompt: 你是智能聊天机器人,你叫小薇 # 根据需要对角色进行设定
file_path: F:/Pictures/temp #设定生成图片和代码使用的文件夹路径
#chatglm:
# key: key #暂时没有
# api: http://localhost:8000/v1 # 根据自己的chatglm地址修改
# proxy: # 如果你在国内你可能需要魔法大概长这样http://域名或者IP地址:端口号
# prompt: 你是智能聊天机器人,你叫小薇 # 根据需要对角色进行设定
# file_path: F:/Pictures/temp #设定生成图片和代码使用的文件夹路径
tigerbot:
key: key

View File

@@ -7,85 +7,84 @@ import openai
import json
import os
import random
from tool.tool_registry import get_tools, dispatch_tool,extract_code
from tool.code_kernel import execute,CodeKernel
from wcferry import Wcf, WxMsg
from chatglm.tool_registry import get_tools, dispatch_tool, extract_code
from chatglm.code_kernel import execute, CodeKernel
from typing import Dict, Union, Optional, Tuple
from wcferry import Wcf
functions = get_tools()
functions=get_tools()
class ChatGLM():
def __init__(self, wcf: Wcf, config={},max_retry=5) -> None:
openai.api_key = config.get('key')
def __init__(self, config={}, wcf: Optional[Wcf] = None, max_retry=5) -> None:
openai.api_key = config.get('key', 'XXX')
# 自己搭建或第三方代理的接口
openai.api_base = config.get('api')
if config.get('proxy',None):
openai.proxy = {"http": config.get('proxy',None), "https": config.get('proxy',None)}
openai.api_base = config.get('api', 'http://localhost:8000/v1')
if config.get('proxy', None):
openai.proxy = {"http": config.get(
'proxy', None), "https": config.get('proxy', None)}
self.conversation_list = {}
self.chat_type={}
self.max_retry=max_retry
self.wcf=wcf
self.filePath=config.get('file_path')
self.chat_type = {}
self.max_retry = max_retry
self.wcf = wcf
self.filePath = config.get('file_path', 'temp')
self.kernel = CodeKernel()
self.system_content_msg = {"chat":[{"role": "system", "content": config.get('prompt')}],
"tool":[{"role": "system", "content": "Answer the following questions as best as you can. You have access to the following tools:"}],
"code":[{"role": "system", "content": "你是一位智能AI助手你叫ChatGLM你连接着一台电脑但请注意不能联网。在使用Python解决任务时你可以运行代码并得到结果如果运行结果有错误你需要尽可能对代码进行改进。你可以处理用户上传到电脑上的文件文件默认存储路径是{}".format(self.filePath)}]}
code0='''
import matplotlib.pyplot as plt
import numpy as np
x = np.linspace(-1, 1, 50)
y = x * x + 1
plt.plot(x, y)
plt.show()
'''
res_type, res = execute(code0, self.kernel) #第一次画图不返回图片问题
print(res_type, res)
self.system_content_msg = {"chat": [{"role": "system", "content": config.get('prompt', '你是智能聊天机器人,你叫小薇')}],
"tool": [{"role": "system", "content": "Answer the following questions as best as you can. You have access to the following tools:"}],
"code": [{"role": "system", "content": "你是一位智能AI助手你叫ChatGLM你连接着一台电脑但请注意不能联网。在使用Python解决任务时你可以运行代码并得到结果如果运行结果有错误你需要尽可能对代码进行改进。你可以处理用户上传到电脑上的文件文件默认存储路径是{}".format(self.filePath)}]}
def get_answer(self, question: str, wxid: str) -> str:
# wxid或者roomid,个人时为微信id群消息时为群id
if '#帮助'==question:
if '#帮助' == question:
return '本助手有三种模式,#聊天模式 = #1 #工具模式 = #2 #代码模式 = #3 , #清除模式会话 = #4 , #清除全部会话 = #5 可用发送#对应模式 或者 #编号 进行切换'
elif '#聊天模式'==question or '#1'==question:
self.chat_type[wxid]='chat'
elif '#聊天模式' == question or '#1' == question:
self.chat_type[wxid] = 'chat'
return '已切换#聊天模式'
elif '#工具模式'==question or '#2'==question:
self.chat_type[wxid]='tool'
elif '#工具模式' == question or '#2' == question:
self.chat_type[wxid] = 'tool'
return '已切换#工具模式 \n工具有:查看天气,日期,新闻,comfyUI文生图。例如\n帮我生成一张小鸟的图片,提示词必须是英文'
elif '#代码模式'==question or '#3'==question:
self.chat_type[wxid]='code'
elif '#代码模式' == question or '#3' == question:
self.chat_type[wxid] = 'code'
return '已切换#代码模式 \n代码模式可以用于写python代码例如\n用python画一个爱心'
elif '#清除模式会话'==question or '#4'==question:
self.conversation_list[wxid][self.chat_type[wxid]]=self.system_content_msg[self.chat_type[wxid]]
elif '#清除模式会话' == question or '#4' == question:
self.conversation_list[wxid][self.chat_type[wxid]
] = self.system_content_msg[self.chat_type[wxid]]
return '已清除'
elif '#清除全部会话'==question or '#5'==question:
self.conversation_list[wxid]=self.system_content_msg
elif '#清除全部会话' == question or '#5' == question:
self.conversation_list[wxid] = self.system_content_msg
return '已清除'
self.updateMessage(wxid, question, "user")
try:
params = dict(model="chatglm3",temperature=1.0, messages=self.conversation_list[wxid][self.chat_type[wxid]], stream=False)
if 'tool'==self.chat_type[wxid]:
params = dict(model="chatglm3", temperature=1.0,
messages=self.conversation_list[wxid][self.chat_type[wxid]], stream=False)
if 'tool' == self.chat_type[wxid]:
params["functions"] = functions
response = openai.ChatCompletion.create(**params)
for _ in range(self.max_retry):
if response.choices[0].message.get("function_call"):
function_call = response.choices[0].message.function_call
print(f"Function Call Response: {function_call.to_dict_recursive()}")
print(
f"Function Call Response: {function_call.to_dict_recursive()}")
function_args = json.loads(function_call.arguments)
observation = dispatch_tool(function_call.name, function_args)
if isinstance(observation,dict):
res_type=observation['res_type'] if 'res_type' in observation else 'text'
res=observation['res'] if 'res_type' in observation else str(observation )
observation = dispatch_tool(
function_call.name, function_args)
if isinstance(observation, dict):
res_type = observation['res_type'] if 'res_type' in observation else 'text'
res = observation['res'] if 'res_type' in observation else str(
observation)
if res_type == 'image':
filename= observation['filename']
filePath=os.path.join(self.filePath,filename)
filename = observation['filename']
filePath = os.path.join(self.filePath, filename)
res.save(filePath)
self.wcf.send_image(filePath,wxid)
tool_response='[Image]' if res_type == 'image' else res
self.wcf and self.wcf.send_image(filePath, wxid)
tool_response = '[Image]' if res_type == 'image' else res
else:
tool_response=observation if isinstance(observation,str) else str(observation)
tool_response = observation if isinstance(
observation, str) else str(observation)
print(f"Tool Call Response: {tool_response}")
params["messages"].append(response.choices[0].message)
@@ -98,24 +97,25 @@ plt.show()
)
self.updateMessage(wxid, tool_response, "function")
response = openai.ChatCompletion.create(**params)
elif response.choices[0].message.content.find('interpreter')!=-1:
output_text=response.choices[0].message.content
elif response.choices[0].message.content.find('interpreter') != -1:
output_text = response.choices[0].message.content
code = extract_code(output_text)
self.wcf.send_text('代码如下:\n'+code,wxid)
self.wcf.send_text('执行代码...',wxid)
self.wcf and self.wcf.send_text('代码如下:\n' + code, wxid)
self.wcf and self.wcf.send_text('执行代码...', wxid)
try:
res_type, res = execute(code, self.kernel)
except Exception as e:
rsp=f'代码执行错误: {e}'
rsp = f'代码执行错误: {e}'
break
if res_type == 'image':
filename= '{}.png'.format(''.join(random.sample('abcdefghijklmnopqrstuvwxyz1234567890',8)))
filePath=os.path.join(self.filePath,filename)
filename = '{}.png'.format(''.join(random.sample(
'abcdefghijklmnopqrstuvwxyz1234567890', 8)))
filePath = os.path.join(self.filePath, filename)
res.save(filePath)
self.wcf.send_image(filePath,wxid)
self.wcf and self.wcf.send_image(filePath, wxid)
else:
self.wcf.send_text("执行结果:\n"+res,wxid)
tool_response='[Image]' if res_type == 'image' else res
self.wcf and self.wcf.send_text("执行结果:\n" + res, wxid)
tool_response = '[Image]' if res_type == 'image' else res
print("Received:", res_type, res)
params["messages"].append(response.choices[0].message)
params["messages"].append(
@@ -144,11 +144,12 @@ plt.show()
if wxid not in self.conversation_list.keys():
self.conversation_list[wxid] = self.system_content_msg
if wxid not in self.chat_type.keys():
self.chat_type[wxid]='chat'
self.chat_type[wxid] = 'chat'
# 当前问题
content_question_ = {"role": role, "content": question}
self.conversation_list[wxid][self.chat_type[wxid]].append(content_question_)
self.conversation_list[wxid][self.chat_type[wxid]].append(
content_question_)
# 只存储10条记录超过滚动清除
i = len(self.conversation_list[wxid][self.chat_type[wxid]])
@@ -160,16 +161,11 @@ plt.show()
if __name__ == "__main__":
from configuration import Config
config = Config().CHATGPT
config = Config().CHATGLM
if not config:
exit(0)
key = config.get("key")
api = config.get("api")
proxy = config.get("proxy")
prompt = config.get("prompt")
chat = ChatGLM(key, api, proxy, prompt)
chat = ChatGLM(config)
while True:
q = input(">>> ")
@@ -178,6 +174,7 @@ if __name__ == "__main__":
print(chat.get_answer(q, "wxid"))
time_end = datetime.now() # 记录结束时间
print(f"{round((time_end - time_start).total_seconds(), 2)}s") # 计算的时间差为程序的执行时间,单位为秒/s
# 计算的时间差为程序的执行时间,单位为秒/s
print(f"{round((time_end - time_start).total_seconds(), 2)}s")
except Exception as e:
print(e)

View File

@@ -8,3 +8,7 @@ schedule
pyhandytools
sparkdesk-api==1.3.0
wcferry>=39.0.3.0
websocket
pillow
jupyter_client
zhdate

View File

@@ -38,7 +38,7 @@ class Robot(Job):
elif self.config.XINGHUO_WEB:
self.chat = XinghuoWeb(self.config.XINGHUO_WEB)
elif self.config.CHATGLM:
self.chat = ChatGLM(wcf, self.config.CHATGLM)
self.chat = ChatGLM(self.config.CHATGLM,wcf)
else:
self.chat = None