from copy import deepcopy import inspect from pprint import pformat import traceback from types import GenericAlias from typing import get_origin, Annotated import json import requests import random import time import re from tool.comfyUI_api import ComfyUIApi from func_news import News from zhdate import ZhDate from datetime import datetime _TOOL_HOOKS = {} _TOOL_DESCRIPTIONS = {} def extract_code(text: str) -> str: pattern = r'```([^\n]*)\n(.*?)```' matches = re.findall(pattern, text, re.DOTALL) return matches[-1][1] def register_tool(func: callable): tool_name = func.__name__ tool_description = inspect.getdoc(func).strip() python_params = inspect.signature(func).parameters tool_params = [] for name, param in python_params.items(): annotation = param.annotation if annotation is inspect.Parameter.empty: raise TypeError(f"Parameter `{name}` missing type annotation") if get_origin(annotation) != Annotated: raise TypeError(f"Annotation type for `{name}` must be typing.Annotated") typ, (description, required) = annotation.__origin__, annotation.__metadata__ typ: str = str(typ) if isinstance(typ, GenericAlias) else typ.__name__ if not isinstance(description, str): raise TypeError(f"Description for `{name}` must be a string") if not isinstance(required, bool): raise TypeError(f"Required for `{name}` must be a bool") tool_params.append({ "name": name, "description": description, "type": typ, "required": required }) tool_def = { "name": tool_name, "description": tool_description, "params": tool_params } #print("[registered tool] " + pformat(tool_def)) _TOOL_HOOKS[tool_name] = func _TOOL_DESCRIPTIONS[tool_name] = tool_def return func def dispatch_tool(tool_name: str, tool_params: dict) -> str: if tool_name not in _TOOL_HOOKS: return f"Tool `{tool_name}` not found. Please use a provided tool." tool_call = _TOOL_HOOKS[tool_name] try: ret = tool_call(**tool_params) except: ret = traceback.format_exc() return ret def get_tools() -> dict: return deepcopy(_TOOL_DESCRIPTIONS) # Tool Definitions # @register_tool # def random_number_generator( # seed: Annotated[int, 'The random seed used by the generator', True], # range: Annotated[tuple[int, int], 'The range of the generated numbers', True], # ) -> int: # """ # Generates a random number x, s.t. range[0] <= x < range[1] # """ # if not isinstance(seed, int): # raise TypeError("Seed must be an integer") # if not isinstance(range, tuple): # raise TypeError("Range must be a tuple") # if not isinstance(range[0], int) or not isinstance(range[1], int): # raise TypeError("Range must be a tuple of integers") # import random # return random.Random(seed).randint(*range) @register_tool def get_weather( city_name: Annotated[str, 'The name of the city to be queried', True], ) -> str: """ Get the current weather for `city_name` """ if not isinstance(city_name, str): raise TypeError("City name must be a string") key_selection = { "current_condition": ["temp_C", "FeelsLikeC", "humidity", "weatherDesc", "observation_time"], } import requests try: resp = requests.get(f"https://wttr.in/{city_name}?format=j1") resp.raise_for_status() resp = resp.json() ret = {k: {_v: resp[k][0][_v] for _v in v} for k, v in key_selection.items()} except: import traceback ret = "Error encountered while fetching weather data!\n" + traceback.format_exc() return str(ret) @register_tool def get_confyui_image(prompt: Annotated[str, '要生成图片的提示词,注意必须是英文', True]) -> dict: ''' 生成图片 ''' with open("tool\\base.json", "r", encoding="utf-8") as f: data2 = json.load(f) data2['prompt']['3']['inputs']['seed']=''.join(random.sample('123456789012345678901234567890',14)) data2['prompt']['4']['inputs']['ckpt_name']='chilloutmix_NiPrunedFp32Fix.safetensors' #模型名称 data2['prompt']['6']['inputs']['text']=prompt #正向提示词 #data2['prompt']['7']['inputs']['text']='' #反向提示词 cfui=ComfyUIApi(server_address="127.0.0.1:8188") #根据自己comfyUI地址修改 images = cfui.get_images(data2['prompt']) return {'res':images[0]['image'],'res_type':'image','filename':images[0]['filename']} @register_tool def get_news() -> str: ''' 获取最新新闻 ''' news = News() return news.get_important_news() @register_tool def get_time() -> str: ''' 获取当前日期,时间,农历日期,星期几 ''' time=datetime.now() date2 = ZhDate.from_datetime(time) week_list = ["星期一","星期二","星期三","星期四","星期五","星期六","星期日"] return '{} {} {}'.format(time.strftime("%Y年%m月%d日 %H:%M:%S"),week_list[time.weekday()],'农历:'+date2.chinese()) if __name__ == "__main__": print(dispatch_tool("get_weather", {"city_name": "beijing"})) print(get_tools())