chatglm3接入,可实现函数调用,代码执行等功能

This commit is contained in:
严浪
2023-11-12 13:41:01 +08:00
parent 51dac0eab1
commit 287e7a2592
8 changed files with 627 additions and 0 deletions

View File

@@ -54,6 +54,13 @@ report_reminder:
# proxy: # 如果你在国内你可能需要魔法大概长这样http://域名或者IP地址:端口号
# prompt: 你是智能聊天机器人你叫wcferry # 根据需要对角色进行设定
chatglm:
key: key
api: http://localhost:8000/v1 # 根据自己的chatglm地址修改
proxy: # 如果你在国内你可能需要魔法大概长这样http://域名或者IP地址:端口号
prompt: 你是智能聊天机器人,你叫小薇 # 根据需要对角色进行设定
file_path: F:/Pictures/temp #设定生成图片和代码使用的文件夹路径
tigerbot:
key: key
model: tigerbot-7b-sft

View File

@@ -33,3 +33,4 @@ class Config(object):
self.REPORT_REMINDERS = yconfig["report_reminder"]["receivers"]
self.TIGERBOT = yconfig.get("tigerbot")
self.XINGHUO_WEB = yconfig.get("xinghuo_web")
self.CHATGLM = yconfig.get("chatglm")

View File

@@ -11,6 +11,7 @@ from wcferry import Wcf, WxMsg
from configuration import Config
from func_chatgpt import ChatGPT
from func_chatglm import ChatGLM
from func_chengyu import cy
from func_news import News
from func_tigerbot import TigerBot
@@ -36,6 +37,8 @@ class Robot(Job):
self.chat = ChatGPT(cgpt.get("key"), cgpt.get("api"), cgpt.get("proxy"), cgpt.get("prompt"))
elif self.config.XINGHUO_WEB:
self.chat = XinghuoWeb(self.config.XINGHUO_WEB)
elif self.config.CHATGLM:
self.chat = ChatGLM(wcf, self.config.CHATGLM)
else:
self.chat = None

0
tool/__init__.py Normal file
View File

88
tool/base.json Normal file
View File

@@ -0,0 +1,88 @@
{
"prompt": {
"3": {
"inputs": {
"seed": 1000573256060686,
"steps": 20,
"cfg": 8,
"sampler_name": "euler",
"scheduler": "normal",
"denoise": 1,
"model": [
"4",
0
],
"positive": [
"6",
0
],
"negative": [
"7",
0
],
"latent_image": [
"5",
0
]
},
"class_type": "KSampler"
},
"4": {
"inputs": {
"ckpt_name": "(修复)512-inpainting-ema.safetensors"
},
"class_type": "CheckpointLoaderSimple"
},
"5": {
"inputs": {
"width": 512,
"height": 512,
"batch_size": 1
},
"class_type": "EmptyLatentImage"
},
"6": {
"inputs": {
"text": "beautiful scenery nature glass bottle landscape, , purple galaxy bottle,dress, ",
"clip": [
"4",
1
]
},
"class_type": "CLIPTextEncode"
},
"7": {
"inputs": {
"text": "text, watermark",
"clip": [
"4",
1
]
},
"class_type": "CLIPTextEncode"
},
"8": {
"inputs": {
"samples": [
"3",
0
],
"vae": [
"4",
2
]
},
"class_type": "VAEDecode"
},
"9": {
"inputs": {
"filename_prefix": "ComfyUI",
"images": [
"8",
0
]
},
"class_type": "SaveImage"
}
}
}

193
tool/code_kernel.py Normal file
View File

@@ -0,0 +1,193 @@
import base64
from io import BytesIO
import os
from pprint import pprint
import queue
import re
from subprocess import PIPE
import jupyter_client
from PIL import Image
import time
IPYKERNEL = os.environ.get('IPYKERNEL', 'chatglm3')
class CodeKernel(object):
def __init__(self,
kernel_name='kernel',
kernel_id=None,
kernel_config_path="",
python_path=None,
ipython_path=None,
init_file_path="./startup.py",
verbose=1):
self.kernel_name = kernel_name
self.kernel_id = kernel_id
self.kernel_config_path = kernel_config_path
self.python_path = python_path
self.ipython_path = ipython_path
self.init_file_path = init_file_path
self.verbose = verbose
if python_path is None and ipython_path is None:
env = None
else:
env = {"PATH": self.python_path + ":$PATH", "PYTHONPATH": self.python_path}
# Initialize the backend kernel
self.kernel_manager = jupyter_client.KernelManager(kernel_name=IPYKERNEL,
connection_file=self.kernel_config_path,
exec_files=[self.init_file_path],
env=env)
if self.kernel_config_path:
self.kernel_manager.load_connection_file()
self.kernel_manager.start_kernel(stdout=PIPE, stderr=PIPE)
print("Backend kernel started with the configuration: {}".format(
self.kernel_config_path))
else:
self.kernel_manager.start_kernel(stdout=PIPE, stderr=PIPE)
print("Backend kernel started with the configuration: {}".format(
self.kernel_manager.connection_file))
if verbose:
print(self.kernel_manager.get_connection_info())
# Initialize the code kernel
self.kernel = self.kernel_manager.blocking_client()
# self.kernel.load_connection_file()
self.kernel.start_channels()
print("Code kernel started.")
def execute(self, code):
self.kernel.execute(code)
try:
shell_msg = self.kernel.get_shell_msg(timeout=40)
io_msg_content = self.kernel.get_iopub_msg(timeout=40)['content']
while True:
msg_out = io_msg_content
### Poll the message
try:
io_msg_content = self.kernel.get_iopub_msg(timeout=40)['content']
if 'execution_state' in io_msg_content and io_msg_content['execution_state'] == 'idle':
break
except queue.Empty:
break
return shell_msg, msg_out
except Exception as e:
print(e)
return None
def execute_interactive(self, code, verbose=False):
shell_msg = self.kernel.execute_interactive(code)
if shell_msg is queue.Empty:
if verbose:
print("Timeout waiting for shell message.")
self.check_msg(shell_msg, verbose=verbose)
return shell_msg
def inspect(self, code, verbose=False):
msg_id = self.kernel.inspect(code)
shell_msg = self.kernel.get_shell_msg(timeout=30)
if shell_msg is queue.Empty:
if verbose:
print("Timeout waiting for shell message.")
self.check_msg(shell_msg, verbose=verbose)
return shell_msg
def get_error_msg(self, msg, verbose=False) -> str | None:
if msg['content']['status'] == 'error':
try:
error_msg = msg['content']['traceback']
except:
try:
error_msg = msg['content']['traceback'][-1].strip()
except:
error_msg = "Traceback Error"
if verbose:
print("Error: ", error_msg)
return error_msg
return None
def check_msg(self, msg, verbose=False):
status = msg['content']['status']
if status == 'ok':
if verbose:
print("Execution succeeded.")
elif status == 'error':
for line in msg['content']['traceback']:
if verbose:
print(line)
def shutdown(self):
# Shutdown the backend kernel
self.kernel_manager.shutdown_kernel()
print("Backend kernel shutdown.")
# Shutdown the code kernel
self.kernel.shutdown()
print("Code kernel shutdown.")
def restart(self):
# Restart the backend kernel
self.kernel_manager.restart_kernel()
# print("Backend kernel restarted.")
def interrupt(self):
# Interrupt the backend kernel
self.kernel_manager.interrupt_kernel()
# print("Backend kernel interrupted.")
def is_alive(self):
return self.kernel.is_alive()
def b64_2_img(data):
buff = BytesIO(base64.b64decode(data))
return Image.open(buff)
def clean_ansi_codes(input_string):
ansi_escape = re.compile(r'(\x9B|\x1B\[|\u001b\[)[0-?]*[ -/]*[@-~]')
return ansi_escape.sub('', input_string)
def execute(code, kernel: CodeKernel) -> tuple[str, str | Image.Image]:
res = ""
res_type = None
code = code.replace("<|observation|>", "")
code = code.replace("<|assistant|>interpreter", "")
code = code.replace("<|assistant|>", "")
code = code.replace("<|user|>", "")
code = code.replace("<|system|>", "")
msg, output = kernel.execute(code)
if msg['metadata']['status'] == "timeout":
return res_type, 'Timed out'
elif msg['metadata']['status'] == 'error':
return res_type, clean_ansi_codes('\n'.join(kernel.get_error_msg(msg, verbose=True)))
if 'text' in output:
res_type = "text"
res = output['text']
elif 'data' in output:
for key in output['data']:
if 'image/png' in key:
res_type = "image"
res = output['data'][key]
break
elif 'text/plain' in key:
res_type = "text"
res = output['data'][key]
if res_type == "image":
return res_type, b64_2_img(res)
elif res_type == "text" or res_type == "traceback":
res = res
return res_type, res
def extract_code(text: str) -> str:
pattern = r'```([^\n]*)\n(.*?)```'
matches = re.findall(pattern, text, re.DOTALL)
return matches[-1][1]

177
tool/comfyUI_api.py Normal file
View File

@@ -0,0 +1,177 @@
#This is an example that uses the websockets api to know when a prompt execution is done
#Once the prompt execution is done it downloads the images using the /history endpoint
import websocket #NOTE: websocket-client (https://github.com/websocket-client/websocket-client)
import uuid
import json
import requests
import urllib
import random
import io
from PIL import Image
class ComfyUIApi():
def __init__(self, server_address="127.0.0.1:8188"):
self.server_address=server_address
self.client_id=str(uuid.uuid4())
self.ws = websocket.WebSocket()
self.ws.connect("ws://{}/ws?clientId={}".format(server_address, self.client_id))
def queue_prompt(self,prompt):
p = {"prompt": prompt, "client_id": self.client_id}
data = json.dumps(p).encode('utf-8')
req = requests.post("http://{}/prompt".format(self.server_address), data=data)
print(req.text)
return json.loads(req.text)
def get_image(self,filename, subfolder, folder_type):
data = {"filename": filename, "subfolder": subfolder, "type": folder_type}
url_values = urllib.parse.urlencode(data)
with requests.get("http://{}/view?{}".format(self.server_address, url_values)) as response:
image = Image.open(io.BytesIO(response.content))
return image
def get_image_url(self,filename, subfolder, folder_type):
data = {"filename": filename, "subfolder": subfolder, "type": folder_type}
url_values = urllib.parse.urlencode(data)
return "http://{}/view?{}".format(self.server_address, url_values)
def get_history(self,prompt_id):
with requests.get("http://{}/history/{}".format(self.server_address, prompt_id)) as response:
return json.loads(response.text)
def get_images(self,prompt,isUrl=False):
prompt_id = self.queue_prompt(prompt)['prompt_id']
output_images = []
while True:
out = self.ws.recv()
if isinstance(out, str):
message = json.loads(out)
if message['type'] == 'executing':
data = message['data']
if data['node'] is None and data['prompt_id'] == prompt_id:
break #Execution is done
else:
continue #previews are binary data
history = self.get_history(prompt_id)[prompt_id]
for o in history['outputs']:
for node_id in history['outputs']:
node_output = history['outputs'][node_id]
if 'images' in node_output:
for image in node_output['images']:
image_data =self.get_image_url(image['filename'], image['subfolder'], image['type']) if isUrl else self.get_image(image['filename'], image['subfolder'], image['type'])
image['image']=image_data
output_images.append(image)
return output_images
prompt_text = """
{
"3": {
"class_type": "KSampler",
"inputs": {
"cfg": 8,
"denoise": 1,
"latent_image": [
"5",
0
],
"model": [
"4",
0
],
"negative": [
"7",
0
],
"positive": [
"6",
0
],
"sampler_name": "euler",
"scheduler": "normal",
"seed": 8566257,
"steps": 20
}
},
"4": {
"class_type": "CheckpointLoaderSimple",
"inputs": {
"ckpt_name": "chilloutmix_NiPrunedFp32Fix.safetensors"
}
},
"5": {
"class_type": "EmptyLatentImage",
"inputs": {
"batch_size": 1,
"height": 512,
"width": 512
}
},
"6": {
"class_type": "CLIPTextEncode",
"inputs": {
"clip": [
"4",
1
],
"text": "masterpiece best quality girl"
}
},
"7": {
"class_type": "CLIPTextEncode",
"inputs": {
"clip": [
"4",
1
],
"text": "bad hands"
}
},
"8": {
"class_type": "VAEDecode",
"inputs": {
"samples": [
"3",
0
],
"vae": [
"4",
2
]
}
},
"9": {
"class_type": "SaveImage",
"inputs": {
"filename_prefix": "ComfyUI",
"images": [
"8",
0
]
}
}
}
"""
if __name__ == '__main__':
prompt = json.loads(prompt_text)
#set the text prompt for our positive CLIPTextEncode
prompt["6"]["inputs"]["text"] = "masterpiece best quality man"
#set the seed for our KSampler node
prompt["3"]["inputs"]["seed"] = ''.join(random.sample('123456789012345678901234567890',14))
cfui=ComfyUIApi()
images = cfui.get_images(prompt)
#Commented out code to display the output images:
for node_id in images:
for image_data in images[node_id]:
from PIL import Image
import io
image = Image.open(io.BytesIO(image_data))
image.show()

158
tool/tool_registry.py Normal file
View File

@@ -0,0 +1,158 @@
from copy import deepcopy
import inspect
from pprint import pformat
import traceback
from types import GenericAlias
from typing import get_origin, Annotated
import json
import requests
import random
import time
import re
from tool.comfyUI_api import ComfyUIApi
from func_news import News
from zhdate import ZhDate
from datetime import datetime
_TOOL_HOOKS = {}
_TOOL_DESCRIPTIONS = {}
def extract_code(text: str) -> str:
pattern = r'```([^\n]*)\n(.*?)```'
matches = re.findall(pattern, text, re.DOTALL)
return matches[-1][1]
def register_tool(func: callable):
tool_name = func.__name__
tool_description = inspect.getdoc(func).strip()
python_params = inspect.signature(func).parameters
tool_params = []
for name, param in python_params.items():
annotation = param.annotation
if annotation is inspect.Parameter.empty:
raise TypeError(f"Parameter `{name}` missing type annotation")
if get_origin(annotation) != Annotated:
raise TypeError(f"Annotation type for `{name}` must be typing.Annotated")
typ, (description, required) = annotation.__origin__, annotation.__metadata__
typ: str = str(typ) if isinstance(typ, GenericAlias) else typ.__name__
if not isinstance(description, str):
raise TypeError(f"Description for `{name}` must be a string")
if not isinstance(required, bool):
raise TypeError(f"Required for `{name}` must be a bool")
tool_params.append({
"name": name,
"description": description,
"type": typ,
"required": required
})
tool_def = {
"name": tool_name,
"description": tool_description,
"params": tool_params
}
#print("[registered tool] " + pformat(tool_def))
_TOOL_HOOKS[tool_name] = func
_TOOL_DESCRIPTIONS[tool_name] = tool_def
return func
def dispatch_tool(tool_name: str, tool_params: dict) -> str:
if tool_name not in _TOOL_HOOKS:
return f"Tool `{tool_name}` not found. Please use a provided tool."
tool_call = _TOOL_HOOKS[tool_name]
try:
ret = tool_call(**tool_params)
except:
ret = traceback.format_exc()
return ret
def get_tools() -> dict:
return deepcopy(_TOOL_DESCRIPTIONS)
# Tool Definitions
# @register_tool
# def random_number_generator(
# seed: Annotated[int, 'The random seed used by the generator', True],
# range: Annotated[tuple[int, int], 'The range of the generated numbers', True],
# ) -> int:
# """
# Generates a random number x, s.t. range[0] <= x < range[1]
# """
# if not isinstance(seed, int):
# raise TypeError("Seed must be an integer")
# if not isinstance(range, tuple):
# raise TypeError("Range must be a tuple")
# if not isinstance(range[0], int) or not isinstance(range[1], int):
# raise TypeError("Range must be a tuple of integers")
# import random
# return random.Random(seed).randint(*range)
@register_tool
def get_weather(
city_name: Annotated[str, 'The name of the city to be queried', True],
) -> str:
"""
Get the current weather for `city_name`
"""
if not isinstance(city_name, str):
raise TypeError("City name must be a string")
key_selection = {
"current_condition": ["temp_C", "FeelsLikeC", "humidity", "weatherDesc", "observation_time"],
}
import requests
try:
resp = requests.get(f"https://wttr.in/{city_name}?format=j1")
resp.raise_for_status()
resp = resp.json()
ret = {k: {_v: resp[k][0][_v] for _v in v} for k, v in key_selection.items()}
except:
import traceback
ret = "Error encountered while fetching weather data!\n" + traceback.format_exc()
return str(ret)
@register_tool
def get_confyui_image(prompt: Annotated[str, '要生成图片的提示词,注意必须是英文', True]) -> dict:
'''
生成图片
'''
with open("tool\\base.json", "r", encoding="utf-8") as f:
data2 = json.load(f)
data2['prompt']['3']['inputs']['seed']=''.join(random.sample('123456789012345678901234567890',14))
data2['prompt']['4']['inputs']['ckpt_name']='chilloutmix_NiPrunedFp32Fix.safetensors' #模型名称
data2['prompt']['6']['inputs']['text']=prompt #正向提示词
#data2['prompt']['7']['inputs']['text']='' #反向提示词
cfui=ComfyUIApi(server_address="127.0.0.1:8188") #根据自己comfyUI地址修改
images = cfui.get_images(data2['prompt'])
return {'res':images[0]['image'],'res_type':'image','filename':images[0]['filename']}
@register_tool
def get_news() -> str:
'''
获取最新新闻
'''
news = News()
return news.get_important_news()
@register_tool
def get_time() -> str:
'''
获取当前日期,时间,农历日期,星期几
'''
time=datetime.now()
date2 = ZhDate.from_datetime(time)
week_list = ["星期一","星期二","星期三","星期四","星期五","星期六","星期日"]
return '{} {} {}'.format(time.strftime("%Y年%m月%d%H:%M:%S"),week_list[time.weekday()],'农历:'+date2.chinese())
if __name__ == "__main__":
print(dispatch_tool("get_weather", {"city_name": "beijing"}))
print(get_tools())