import inspect import json import random import re import time import traceback from copy import deepcopy from datetime import datetime from pprint import pformat from types import GenericAlias from typing import Annotated, get_origin import requests from func_news import News from zhdate import ZhDate from chatglm.comfyUI_api import ComfyUIApi _TOOL_HOOKS = {} _TOOL_DESCRIPTIONS = {} def extract_code(text: str) -> str: pattern = r'```([^\n]*)\n(.*?)```' matches = re.findall(pattern, text, re.DOTALL) return matches[-1][1] def register_tool(func: callable): tool_name = func.__name__ tool_description = inspect.getdoc(func).strip() python_params = inspect.signature(func).parameters tool_params = [] for name, param in python_params.items(): annotation = param.annotation if annotation is inspect.Parameter.empty: raise TypeError(f"Parameter `{name}` missing type annotation") if get_origin(annotation) != Annotated: raise TypeError( f"Annotation type for `{name}` must be typing.Annotated") typ, (description, required) = annotation.__origin__, annotation.__metadata__ typ: str = str(typ) if isinstance(typ, GenericAlias) else typ.__name__ if not isinstance(description, str): raise TypeError(f"Description for `{name}` must be a string") if not isinstance(required, bool): raise TypeError(f"Required for `{name}` must be a bool") tool_params.append({ "name": name, "description": description, "type": typ, "required": required }) tool_def = { "name": tool_name, "description": tool_description, "params": tool_params } # print("[registered tool] " + pformat(tool_def)) _TOOL_HOOKS[tool_name] = func _TOOL_DESCRIPTIONS[tool_name] = tool_def return func def dispatch_tool(tool_name: str, tool_params: dict) -> str: if tool_name not in _TOOL_HOOKS: return f"Tool `{tool_name}` not found. Please use a provided tool." tool_call = _TOOL_HOOKS[tool_name] try: ret = tool_call(**tool_params) except BaseException: ret = traceback.format_exc() return ret def get_tools() -> dict: return deepcopy(_TOOL_DESCRIPTIONS) # Tool Definitions # @register_tool # def random_number_generator( # seed: Annotated[int, 'The random seed used by the generator', True], # range: Annotated[tuple[int, int], 'The range of the generated numbers', True], # ) -> int: # """ # Generates a random number x, s.t. range[0] <= x < range[1] # """ # if not isinstance(seed, int): # raise TypeError("Seed must be an integer") # if not isinstance(range, tuple): # raise TypeError("Range must be a tuple") # if not isinstance(range[0], int) or not isinstance(range[1], int): # raise TypeError("Range must be a tuple of integers") # import random # return random.Random(seed).randint(*range) @register_tool def get_weather( city_name: Annotated[str, 'The name of the city to be queried', True], ) -> str: """ Get the current weather for `city_name` """ if not isinstance(city_name, str): raise TypeError("City name must be a string") key_selection = { "current_condition": ["temp_C", "FeelsLikeC", "humidity", "weatherDesc", "observation_time"], } import requests try: resp = requests.get(f"https://wttr.in/{city_name}?format=j1") resp.raise_for_status() resp = resp.json() ret = {k: {_v: resp[k][0][_v] for _v in v} for k, v in key_selection.items()} except BaseException: import traceback ret = "Error encountered while fetching weather data!\n" + traceback.format_exc() return str(ret) @register_tool def get_confyui_image(prompt: Annotated[str, '要生成图片的提示词,注意必须是英文', True]) -> dict: ''' 生成图片 ''' with open("chatglm\\base.json", "r", encoding="utf-8") as f: data2 = json.load(f) data2['prompt']['3']['inputs']['seed'] = ''.join( random.sample('123456789012345678901234567890', 14)) # 模型名称 data2['prompt']['4']['inputs']['ckpt_name'] = 'chilloutmix_NiPrunedFp32Fix.safetensors' data2['prompt']['6']['inputs']['text'] = prompt # 正向提示词 # data2['prompt']['7']['inputs']['text']='' #反向提示词 cfui = ComfyUIApi(server_address="127.0.0.1:8188") # 根据自己comfyUI地址修改 images = cfui.get_images(data2['prompt']) return {'res': images[0]['image'], 'res_type': 'image', 'filename': images[0]['filename']} @register_tool def get_news() -> str: ''' 获取最新新闻 ''' news = News() return news.get_important_news() @register_tool def get_time() -> str: ''' 获取当前日期,时间,农历日期,星期几 ''' time = datetime.now() date2 = ZhDate.from_datetime(time) week_list = ["星期一", "星期二", "星期三", "星期四", "星期五", "星期六", "星期日"] return '{} {} {}'.format(time.strftime("%Y年%m月%d日 %H:%M:%S"), week_list[time.weekday()], '农历:' + date2.chinese()) if __name__ == "__main__": print(dispatch_tool("get_weather", {"city_name": "beijing"})) print(get_tools())