fix(ai_auto_response): avoid forced split of single short replies
This commit is contained in:
@@ -12,11 +12,11 @@ def finalize_reply(response: str, reply_mode: str) -> List[str]:
|
||||
text = text.replace("\n", " ").strip()
|
||||
|
||||
if reply_mode == "social_short":
|
||||
return split_reply_chunks(text, sentence_limit=2, char_limit=24, chunk_limit=2)
|
||||
return split_reply_chunks(text, sentence_limit=2, char_limit=24, chunk_limit=2, allow_clip_split=False)
|
||||
if reply_mode == "qa_fast":
|
||||
return split_reply_chunks(text, sentence_limit=2, char_limit=32, chunk_limit=2)
|
||||
return split_reply_chunks(text, sentence_limit=2, char_limit=32, chunk_limit=2, allow_clip_split=False)
|
||||
if reply_mode == "qa_with_context":
|
||||
return split_reply_chunks(text, sentence_limit=2, char_limit=40, chunk_limit=2)
|
||||
return split_reply_chunks(text, sentence_limit=2, char_limit=40, chunk_limit=2, allow_clip_split=False)
|
||||
return [take_first_sentence(text, 28).strip()]
|
||||
|
||||
|
||||
@@ -45,14 +45,22 @@ def take_first_sentence(text: str, limit: int) -> str:
|
||||
return smart_clip(first, limit)
|
||||
|
||||
|
||||
def split_reply_chunks(text: str, sentence_limit: int, char_limit: int, chunk_limit: int) -> List[str]:
|
||||
def split_reply_chunks(
|
||||
text: str,
|
||||
sentence_limit: int,
|
||||
char_limit: int,
|
||||
chunk_limit: int,
|
||||
allow_clip_split: bool = True,
|
||||
) -> List[str]:
|
||||
parts = [item.strip() for item in re.split(r"(?<=[。!?!?;;])", text) if item.strip()]
|
||||
if not parts:
|
||||
short = text.strip()
|
||||
clipped = smart_clip(short, char_limit)
|
||||
remainder = short[len(clipped):].strip(",,、;;:: ")
|
||||
if not short:
|
||||
return []
|
||||
if not allow_clip_split:
|
||||
return [clipped] if clipped else []
|
||||
remainder = short[len(clipped):].strip(",,、;;:: ")
|
||||
return [item for item in [clipped, smart_clip(remainder, char_limit)] if item][:chunk_limit]
|
||||
|
||||
chunks: List[str] = []
|
||||
@@ -67,6 +75,8 @@ def split_reply_chunks(text: str, sentence_limit: int, char_limit: int, chunk_li
|
||||
clipped = current[:char_limit].rstrip(",,、;;:: ").strip()
|
||||
if clipped:
|
||||
chunks.append(clipped)
|
||||
if not allow_clip_split:
|
||||
break
|
||||
current = current[len(clipped):].strip(",,、;;:: ")
|
||||
return chunks[:chunk_limit] or [smart_clip(text, char_limit)]
|
||||
|
||||
|
||||
BIN
test/hifi_clone.wav
Normal file
BIN
test/hifi_clone.wav
Normal file
Binary file not shown.
BIN
test/lzl.mp3
Normal file
BIN
test/lzl.mp3
Normal file
Binary file not shown.
BIN
test/lzl.wav
Normal file
BIN
test/lzl.wav
Normal file
Binary file not shown.
BIN
test/lzl_prompt_cache.pt
Normal file
BIN
test/lzl_prompt_cache.pt
Normal file
Binary file not shown.
BIN
test/voice_design.wav
Normal file
BIN
test/voice_design.wav
Normal file
Binary file not shown.
67
test/voxcpm_test.py
Normal file
67
test/voxcpm_test.py
Normal file
@@ -0,0 +1,67 @@
|
||||
from voxcpm import VoxCPM
|
||||
import soundfile as sf
|
||||
import torch
|
||||
from pathlib import Path
|
||||
import sys
|
||||
|
||||
BASE_DIR = Path(__file__).resolve().parent
|
||||
DEVICE = "cuda"
|
||||
PROMPT_WAV = BASE_DIR / "lzl.wav"
|
||||
CACHE_PATH = BASE_DIR / "lzl_prompt_cache.pt"
|
||||
VOICE_DESIGN_PATH = BASE_DIR / "voice_design.wav"
|
||||
HIFI_CLONE_PATH = BASE_DIR / "hifi_clone.wav"
|
||||
PROMPT_TEXT = "亲爱的,今天的你要出发,挣钱喽,都没有系好安全带。 乖乖仔,我要到了,奇怪还没有分开就开始想你了,注意一下,闯红灯拍照,我可不喜欢明知孤犯的小坏蛋,安全带系一好,我们这边要出发喽。 小坏蛋,前方有限速拍照,姐姐给你盯着呢,车速太快了啊,慢一点慢一点降下来,不要让我害怕好吗? 过最堵的路段,千万不要着急,姐姐会一直陪着你。 今日导航就先到这里了,哥哥注意安全停车哦。"
|
||||
TARGET_TEXT = "慢慢来吧,额度还在就好~"
|
||||
|
||||
if DEVICE != "cuda":
|
||||
raise RuntimeError(f"Unsupported device: {DEVICE}. This script only supports CUDA.")
|
||||
|
||||
if not torch.cuda.is_available():
|
||||
raise RuntimeError(
|
||||
"This script requires CUDA. "
|
||||
f"Current python: {sys.executable}, torch: {torch.__version__}. "
|
||||
"Please run it with a CUDA-enabled Python environment."
|
||||
)
|
||||
|
||||
model = VoxCPM.from_pretrained("openbmb/VoxCPM2", load_denoiser=False)
|
||||
if model.tts_model.device != DEVICE:
|
||||
raise RuntimeError(f"Expected VoxCPM to run on {DEVICE}, got {model.tts_model.device}")
|
||||
print(f"VoxCPM loaded on {model.tts_model.device} with torch {torch.__version__}")
|
||||
|
||||
wav = model.generate(
|
||||
text="(A young woman, gentle and sweet voice)Hello, welcome to VoxCPM!",
|
||||
cfg_value=2.0,
|
||||
inference_timesteps=10,
|
||||
)
|
||||
sf.write(str(VOICE_DESIGN_PATH), wav, model.tts_model.sample_rate)
|
||||
print(f"Saved voice design to {VOICE_DESIGN_PATH}")
|
||||
|
||||
# Build and persist the prompt cache so later runs can skip prompt encoding.
|
||||
if CACHE_PATH.exists():
|
||||
prompt_cache = torch.load(CACHE_PATH, map_location="cpu")
|
||||
print(f"Loaded prompt cache from {CACHE_PATH}")
|
||||
else:
|
||||
prompt_cache = model.tts_model.build_prompt_cache(
|
||||
prompt_wav_path=str(PROMPT_WAV),
|
||||
prompt_text=PROMPT_TEXT,
|
||||
reference_wav_path=str(PROMPT_WAV),
|
||||
)
|
||||
torch.save(prompt_cache, CACHE_PATH)
|
||||
print(f"Built and saved prompt cache to {CACHE_PATH}")
|
||||
|
||||
cache_devices = {
|
||||
key: str(value.device)
|
||||
for key, value in prompt_cache.items()
|
||||
if isinstance(value, torch.Tensor)
|
||||
}
|
||||
print(f"Prompt cache tensor devices: {cache_devices}")
|
||||
|
||||
wav, _, _ = model.tts_model.generate_with_prompt_cache(
|
||||
target_text=TARGET_TEXT,
|
||||
prompt_cache=prompt_cache,
|
||||
cfg_value=2.0,
|
||||
inference_timesteps=10,
|
||||
)
|
||||
wav = wav.detach().cpu().float().squeeze(0).numpy()
|
||||
sf.write(str(HIFI_CLONE_PATH), wav, model.tts_model.sample_rate)
|
||||
print(f"Saved cloned audio to {HIFI_CLONE_PATH}")
|
||||
Reference in New Issue
Block a user